diff --git a/.prow/scripts/test-end-to-end.sh b/.prow/scripts/test-end-to-end.sh index 45cb9c82ae..b9d7fa9088 100755 --- a/.prow/scripts/test-end-to-end.sh +++ b/.prow/scripts/test-end-to-end.sh @@ -66,6 +66,7 @@ sleep 20 tail -n10 /var/log/kafka.log kafkacat -b localhost:9092 -L +if [[ ${SKIP_BUILD_JARS} != "true" ]]; then echo " ============================================================ Building jars for Feast @@ -81,6 +82,9 @@ mvn --quiet --batch-mode --define skipTests=true clean package ls -lh core/target/*jar ls -lh serving/target/*jar +else + echo "[DEBUG] Skipping building jars" +fi echo " ============================================================ diff --git a/ingestion/pom.xml b/ingestion/pom.xml index c829674a64..001da1a145 100644 --- a/ingestion/pom.xml +++ b/ingestion/pom.xml @@ -248,5 +248,12 @@ 2.8.1 + + + org.apache.commons + commons-math3 + 3.6.1 + + diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index 6afdd80dd7..c1bdcd5fd1 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -26,6 +26,7 @@ /** Options passed to Beam to influence the job's execution environment */ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions { + @Required @Description( "JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format." @@ -83,4 +84,13 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, int getStatsdPort(); void setStatsdPort(int StatsdPort); + + @Description( + "Fixed window size in seconds (default 30) to apply before aggregation of numerical value of features" + + "and writing the aggregated value to StatsD. Refer to feast.ingestion.transform.metrics.WriteFeatureValueMetricsDoFn" + + "for details on the metric names and types.") + @Default.Integer(30) + int getWindowSizeInSecForFeatureValueMetric(); + + void setWindowSizeInSecForFeatureValueMetric(int seconds); } diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java new file mode 100644 index 0000000000..8574d2414c --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java @@ -0,0 +1,311 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.metrics; + +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_NAME_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_PROJECT_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_VERSION_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.INGESTION_JOB_NAME_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.METRIC_PREFIX; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.STORE_TAG_KEY; + +import com.google.auto.value.AutoValue; +import com.timgroup.statsd.NonBlockingStatsDClient; +import com.timgroup.statsd.StatsDClient; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.Value; +import java.util.ArrayList; +import java.util.DoubleSummaryStatistics; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.commons.math3.stat.descriptive.rank.Percentile; +import org.slf4j.Logger; + +/** + * WriteFeatureValueMetricsDoFn accepts key value of FeatureSetRef(str) to FeatureRow(List) and + * writes a histogram of the numerical values of each feature to StatsD. + * + *

The histogram of the numerical values is represented as the following in StatsD: + * + *

+ * + *

StatsD timing/histogram metric type is not used since it does not support negative values. + */ +@AutoValue +public abstract class WriteFeatureValueMetricsDoFn + extends DoFn>, Void> { + + abstract String getStoreName(); + + abstract String getStatsdHost(); + + abstract int getStatsdPort(); + + static Builder newBuilder() { + return new AutoValue_WriteFeatureValueMetricsDoFn.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setStoreName(String storeName); + + abstract Builder setStatsdHost(String statsdHost); + + abstract Builder setStatsdPort(int statsdPort); + + abstract WriteFeatureValueMetricsDoFn build(); + } + + private static final Logger log = + org.slf4j.LoggerFactory.getLogger(WriteFeatureValueMetricsDoFn.class); + private StatsDClient statsDClient; + public static String GAUGE_NAME_FEATURE_VALUE_MIN = "feature_value_min"; + public static String GAUGE_NAME_FEATURE_VALUE_MAX = "feature_value_max"; + public static String GAUGE_NAME_FEATURE_VALUE_MEAN = "feature_value_mean"; + public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50 = "feature_value_percentile_50"; + public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90 = "feature_value_percentile_90"; + public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95 = "feature_value_percentile_95"; + + @Setup + public void setup() { + // Note that exception may be thrown during StatsD client instantiation but no exception + // will be thrown when sending metrics (mimicking the UDP protocol behaviour). + // https://jar-download.com/artifacts/com.datadoghq/java-dogstatsd-client/2.1.1/documentation + // https://github.com/DataDog/java-dogstatsd-client#unix-domain-socket-support + try { + statsDClient = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort()); + } catch (Exception e) { + log.error("StatsD client cannot be started: " + e.getMessage()); + } + } + + @Teardown + public void tearDown() { + if (statsDClient != null) { + statsDClient.close(); + } + } + + @ProcessElement + public void processElement( + ProcessContext context, + @Element KV> featureSetRefToFeatureRows) { + if (statsDClient == null) { + return; + } + + String featureSetRef = featureSetRefToFeatureRows.getKey(); + if (featureSetRef == null) { + return; + } + String[] colonSplits = featureSetRef.split(":"); + if (colonSplits.length != 2) { + log.error( + "Skip writing feature value metrics because the feature set reference '{}' does not" + + "follow the required format /:", + featureSetRef); + return; + } + String[] slashSplits = colonSplits[0].split("/"); + if (slashSplits.length != 2) { + log.error( + "Skip writing feature value metrics because the feature set reference '{}' does not" + + "follow the required format /:", + featureSetRef); + return; + } + String projectName = slashSplits[0]; + String featureSetName = slashSplits[1]; + String version = colonSplits[1]; + + Map featureNameToStats = new HashMap<>(); + Map> featureNameToValues = new HashMap<>(); + for (FeatureRow featureRow : featureSetRefToFeatureRows.getValue()) { + for (Field field : featureRow.getFieldsList()) { + updateStats(featureNameToStats, featureNameToValues, field); + } + } + + for (Entry entry : featureNameToStats.entrySet()) { + String featureName = entry.getKey(); + DoubleSummaryStatistics stats = entry.getValue(); + String[] tags = { + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_PROJECT_TAG_KEY + ":" + projectName, + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, + FEATURE_SET_VERSION_TAG_KEY + ":" + version, + FEATURE_TAG_KEY + ":" + featureName, + INGESTION_JOB_NAME_KEY + ":" + context.getPipelineOptions().getJobName() + }; + + // stats can return non finite values when there is no element + // or there is an element that is not a number. Metric should only be sent for finite values. + if (Double.isFinite(stats.getMin())) { + if (stats.getMin() < 0) { + // StatsD gauge will asssign a delta instead of the actual value, if there is a sign in + // the value. E.g. if the value is negative, a delta will be assigned. For this reason, + // the gauge value is set to zero beforehand. + // https://github.com/statsd/statsd/blob/master/docs/metric_types.md#gauges + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MIN, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MIN, stats.getMin(), tags); + } + if (Double.isFinite(stats.getMax())) { + if (stats.getMax() < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MAX, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MAX, stats.getMax(), tags); + } + if (Double.isFinite(stats.getAverage())) { + if (stats.getAverage() < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MEAN, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MEAN, stats.getAverage(), tags); + } + + // For percentile calculation, Percentile class from commons-math3 from Apache is used. + // Percentile requires double[], hence the conversion below. + if (!featureNameToValues.containsKey(featureName)) { + continue; + } + List valueList = featureNameToValues.get(featureName); + if (valueList == null || valueList.size() < 1) { + continue; + } + double[] values = new double[valueList.size()]; + for (int i = 0; i < values.length; i++) { + values[i] = valueList.get(i); + } + + double p50 = new Percentile().evaluate(values, 50); + if (p50 < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50, p50, tags); + + double p90 = new Percentile().evaluate(values, 90); + if (p90 < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90, p90, tags); + + double p95 = new Percentile().evaluate(values, 95); + if (p95 < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95, p95, tags); + } + } + + // Update stats and values array for the feature represented by the field. + // If the field contains non-numerical or non-boolean value, the stats and values array + // won't get updated because we are only concerned with numerical value in metrics data. + // For boolean value, true and false are treated as numerical value of 1 of 0 respectively. + private void updateStats( + Map featureNameToStats, + Map> featureNameToValues, + Field field) { + if (featureNameToStats == null || featureNameToValues == null || field == null) { + return; + } + + String featureName = field.getName(); + if (!featureNameToStats.containsKey(featureName)) { + featureNameToStats.put(featureName, new DoubleSummaryStatistics()); + } + if (!featureNameToValues.containsKey(featureName)) { + featureNameToValues.put(featureName, new ArrayList<>()); + } + + Value value = field.getValue(); + DoubleSummaryStatistics stats = featureNameToStats.get(featureName); + List values = featureNameToValues.get(featureName); + + switch (value.getValCase()) { + case INT32_VAL: + stats.accept(value.getInt32Val()); + values.add(((double) value.getInt32Val())); + break; + case INT64_VAL: + stats.accept(value.getInt64Val()); + values.add((double) value.getInt64Val()); + break; + case DOUBLE_VAL: + stats.accept(value.getDoubleVal()); + values.add(value.getDoubleVal()); + break; + case FLOAT_VAL: + stats.accept(value.getFloatVal()); + values.add((double) value.getFloatVal()); + break; + case BOOL_VAL: + stats.accept(value.getBoolVal() ? 1 : 0); + values.add(value.getBoolVal() ? 1d : 0d); + break; + case INT32_LIST_VAL: + for (Integer val : value.getInt32ListVal().getValList()) { + stats.accept(val); + values.add(((double) val)); + } + break; + case INT64_LIST_VAL: + for (Long val : value.getInt64ListVal().getValList()) { + stats.accept(val); + values.add(((double) val)); + } + break; + case DOUBLE_LIST_VAL: + for (Double val : value.getDoubleListVal().getValList()) { + stats.accept(val); + values.add(val); + } + break; + case FLOAT_LIST_VAL: + for (Float val : value.getFloatListVal().getValList()) { + stats.accept(val); + values.add(((double) val)); + } + break; + case BOOL_LIST_VAL: + for (Boolean val : value.getBoolListVal().getValList()) { + stats.accept(val ? 1 : 0); + values.add(val ? 1d : 0d); + } + break; + case BYTES_VAL: + case BYTES_LIST_VAL: + case STRING_VAL: + case STRING_LIST_VAL: + case VAL_NOT_SET: + default: + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java index 43f314aa86..10322ac812 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java @@ -21,11 +21,16 @@ import feast.ingestion.values.FailedElement; import feast.types.FeatureRowProto.FeatureRow; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; @AutoValue public abstract class WriteMetricsTransform extends PTransform { @@ -79,6 +84,42 @@ public PDone expand(PCollectionTuple input) { .setStoreName(getStoreName()) .build())); + // 1. Apply a fixed window + // 2. Group feature row by feature set reference + // 3. Calculate min, max, mean, percentiles of numerical values of features in the window + // and + // 4. Send the aggregate value to StatsD metric collector. + // + // NOTE: window is applied here so the metric collector will not be overwhelmed with + // metrics data. And for metric data, only statistic of the values are usually required + // vs the actual values. + input + .get(getSuccessTag()) + .apply( + "FixedWindow", + Window.into( + FixedWindows.of( + Duration.standardSeconds( + options.getWindowSizeInSecForFeatureValueMetric())))) + .apply( + "ConvertTo_FeatureSetRefToFeatureRow", + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c, @Element FeatureRow featureRow) { + c.output(KV.of(featureRow.getFeatureSet(), featureRow)); + } + })) + .apply("GroupByFeatureSetRef", GroupByKey.create()) + .apply( + "WriteFeatureValueMetrics", + ParDo.of( + WriteFeatureValueMetricsDoFn.newBuilder() + .setStatsdHost(options.getStatsdHost()) + .setStatsdPort(options.getStatsdPort()) + .setStoreName(getStoreName()) + .build())); + return PDone.in(input.getPipeline()); case "none": default: diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java index db2d1acd6d..2cd1ee94ec 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java @@ -31,13 +31,13 @@ public abstract class WriteRowMetricsDoFn extends DoFn { private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class); - private final String METRIC_PREFIX = "feast_ingestion"; - private final String STORE_TAG_KEY = "feast_store"; - private final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name"; - private final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name"; - private final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version"; - private final String FEATURE_TAG_KEY = "feast_feature_name"; - private final String INGESTION_JOB_NAME_KEY = "ingestion_job_name"; + public static final String METRIC_PREFIX = "feast_ingestion"; + public static final String STORE_TAG_KEY = "feast_store"; + public static final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name"; + public static final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name"; + public static final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version"; + public static final String FEATURE_TAG_KEY = "feast_feature_name"; + public static final String INGESTION_JOB_NAME_KEY = "ingestion_job_name"; public abstract String getStoreName(); diff --git a/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java new file mode 100644 index 0000000000..8f0adf4016 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java @@ -0,0 +1,315 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.metrics; + +import static org.junit.Assert.fail; + +import com.google.protobuf.ByteString; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FeatureRowProto.FeatureRow.Builder; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.BoolList; +import feast.types.ValueProto.BytesList; +import feast.types.ValueProto.DoubleList; +import feast.types.ValueProto.FloatList; +import feast.types.ValueProto.Int32List; +import feast.types.ValueProto.Int64List; +import feast.types.ValueProto.StringList; +import feast.types.ValueProto.Value; +import java.io.BufferedReader; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.junit.Rule; +import org.junit.Test; + +public class WriteFeatureValueMetricsDoFnTest { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private static final int STATSD_SERVER_PORT = 17254; + private final DummyStatsDServer statsDServer = new DummyStatsDServer(STATSD_SERVER_PORT); + + @Test + public void shouldSendCorrectStatsDMetrics() throws IOException, InterruptedException { + PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); + pipelineOptions.setJobName("job"); + + Map> input = + readTestInput("feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input"); + List expectedLines = + readTestOutput("feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output"); + + pipeline + .apply(Create.of(input)) + .apply( + ParDo.of( + WriteFeatureValueMetricsDoFn.newBuilder() + .setStatsdHost("localhost") + .setStatsdPort(STATSD_SERVER_PORT) + .setStoreName("store") + .build())); + pipeline.run(pipelineOptions).waitUntilFinish(); + // Wait until StatsD has finished processed all messages, 3 sec is a reasonable duration + // based on empirical testing. + Thread.sleep(3000); + + List actualLines = statsDServer.messagesReceived(); + for (String expected : expectedLines) { + boolean matched = false; + for (String actual : actualLines) { + if (actual.equals(expected)) { + matched = true; + break; + } + } + if (!matched) { + System.out.println("Print actual metrics output for debugging:"); + for (String line : actualLines) { + System.out.println(line); + } + fail(String.format("Expected StatsD metric not found:\n%s", expected)); + } + } + } + + // Test utility method to read expected StatsD metrics output from a text file. + @SuppressWarnings("SameParameterValue") + private List readTestOutput(String path) throws IOException { + URL url = Thread.currentThread().getContextClassLoader().getResource(path); + if (url == null) { + throw new IllegalArgumentException( + "cannot read test data, path contains null url. Path: " + path); + } + List lines = new ArrayList<>(); + try (BufferedReader reader = Files.newBufferedReader(Paths.get(url.getPath()))) { + String line = reader.readLine(); + while (line != null) { + if (line.trim().length() > 1) { + lines.add(line); + } + line = reader.readLine(); + } + } + return lines; + } + + // Test utility method to create test feature row data from a text file. + @SuppressWarnings("SameParameterValue") + private Map> readTestInput(String path) throws IOException { + Map> data = new HashMap<>(); + URL url = Thread.currentThread().getContextClassLoader().getResource(path); + if (url == null) { + throw new IllegalArgumentException( + "cannot read test data, path contains null url. Path: " + path); + } + List lines = new ArrayList<>(); + try (BufferedReader reader = Files.newBufferedReader(Paths.get(url.getPath()))) { + String line = reader.readLine(); + while (line != null) { + lines.add(line); + line = reader.readLine(); + } + } + List colNames = new ArrayList<>(); + for (String line : lines) { + if (line.strip().length() < 1) { + continue; + } + String[] splits = line.split(","); + colNames.addAll(Arrays.asList(splits)); + + if (line.startsWith("featuresetref")) { + // Header line + colNames.addAll(Arrays.asList(splits).subList(1, splits.length)); + continue; + } + + Builder featureRowBuilder = FeatureRow.newBuilder(); + for (int i = 0; i < splits.length; i++) { + String colVal = splits[i].strip(); + if (i == 0) { + featureRowBuilder.setFeatureSet(colVal); + continue; + } + String colName = colNames.get(i); + Field.Builder fieldBuilder = Field.newBuilder().setName(colName); + if (!colVal.isEmpty()) { + switch (colName) { + case "int32": + fieldBuilder.setValue(Value.newBuilder().setInt32Val((Integer.parseInt(colVal)))); + break; + case "int64": + fieldBuilder.setValue(Value.newBuilder().setInt64Val((Long.parseLong(colVal)))); + break; + case "double": + fieldBuilder.setValue(Value.newBuilder().setDoubleVal((Double.parseDouble(colVal)))); + break; + case "float": + fieldBuilder.setValue(Value.newBuilder().setFloatVal((Float.parseFloat(colVal)))); + break; + case "bool": + fieldBuilder.setValue(Value.newBuilder().setBoolVal((Boolean.parseBoolean(colVal)))); + break; + case "int32list": + List int32List = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + int32List.add(Integer.parseInt(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setInt32ListVal(Int32List.newBuilder().addAllVal(int32List))); + break; + case "int64list": + List int64list = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + int64list.add(Long.parseLong(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setInt64ListVal(Int64List.newBuilder().addAllVal(int64list))); + break; + case "doublelist": + List doubleList = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + doubleList.add(Double.parseDouble(val)); + } + fieldBuilder.setValue( + Value.newBuilder() + .setDoubleListVal(DoubleList.newBuilder().addAllVal(doubleList))); + break; + case "floatlist": + List floatList = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + floatList.add(Float.parseFloat(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setFloatListVal(FloatList.newBuilder().addAllVal(floatList))); + break; + case "boollist": + List boolList = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + boolList.add(Boolean.parseBoolean(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setBoolListVal(BoolList.newBuilder().addAllVal(boolList))); + break; + case "bytes": + fieldBuilder.setValue( + Value.newBuilder().setBytesVal(ByteString.copyFromUtf8("Dummy"))); + break; + case "byteslist": + fieldBuilder.setValue( + Value.newBuilder().setBytesListVal(BytesList.getDefaultInstance())); + break; + case "string": + fieldBuilder.setValue(Value.newBuilder().setStringVal("Dummy")); + break; + case "stringlist": + fieldBuilder.setValue( + Value.newBuilder().setStringListVal(StringList.getDefaultInstance())); + break; + } + } + featureRowBuilder.addFields(fieldBuilder); + } + + if (!data.containsKey(featureRowBuilder.getFeatureSet())) { + data.put(featureRowBuilder.getFeatureSet(), new ArrayList<>()); + } + List featureRowsByFeatureSetRef = data.get(featureRowBuilder.getFeatureSet()); + featureRowsByFeatureSetRef.add(featureRowBuilder.build()); + } + + // Convert List to Iterable to match the function signature in + // WriteFeatureValueMetricsDoFn + Map> dataWithIterable = new HashMap<>(); + for (Entry> entrySet : data.entrySet()) { + String key = entrySet.getKey(); + Iterable value = entrySet.getValue(); + dataWithIterable.put(key, value); + } + return dataWithIterable; + } + + // Modified version of + // https://github.com/tim-group/java-statsd-client/blob/master/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java + @SuppressWarnings("CatchMayIgnoreException") + private static final class DummyStatsDServer { + + private final List messagesReceived = new ArrayList(); + private final DatagramSocket server; + + public DummyStatsDServer(int port) { + try { + server = new DatagramSocket(port); + } catch (SocketException e) { + throw new IllegalStateException(e); + } + new Thread( + () -> { + try { + while (true) { + final DatagramPacket packet = new DatagramPacket(new byte[65535], 65535); + server.receive(packet); + messagesReceived.add( + new String(packet.getData(), StandardCharsets.UTF_8).trim() + "\n"); + Thread.sleep(50); + } + + } catch (Exception e) { + } + }) + .start(); + } + + public void stop() { + server.close(); + } + + public void waitForMessage() { + while (messagesReceived.isEmpty()) { + try { + Thread.sleep(50L); + } catch (InterruptedException e) { + } + } + } + + public List messagesReceived() { + List out = new ArrayList<>(); + for (String msg : messagesReceived) { + String[] lines = msg.split("\n"); + out.addAll(Arrays.asList(lines)); + } + return out; + } + } +} diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README new file mode 100644 index 0000000000..3c8759d170 --- /dev/null +++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README @@ -0,0 +1,9 @@ +WriteFeatureValueMetricsDoFnTest.input file contains data that can be read by test utility +into map of FeatureSetRef -> [FeatureRow]. In the first row, the cell value corresponds to the +field name in the FeatureRow. This should not be changed as the test utility derives the value +type from this name. Empty value in the cell is a value that is not set. For list type, the values +of different element is separated by the '|' character. + +WriteFeatureValueMetricsDoFnTest.output file contains lines of expected StatsD metrics that should +be sent when WriteFeatureValueMetricsDoFn runs. It can be checked against the actual outputted +StatsD metrics to test for correctness. diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input new file mode 100644 index 0000000000..d2985711ce --- /dev/null +++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input @@ -0,0 +1,4 @@ +featuresetref,int32,int64,double,float,bool,int32list,int64list,doublelist,floatlist,boollist,bytes,byteslist,string,stringlist +project/featureset:1,1,5,8,5,true,1|4|3,5|1|12,5|7|3,-2.0,true|false,,,, +project/featureset:1,5,-10,8,10.0,true,1|12|5,,,-1.0|-3.0,false|true,,,, +project/featureset:1,6,-4,8,0.0,true,2,2|5,,,true|false,,,, \ No newline at end of file diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output new file mode 100644 index 0000000000..63bc7bbfa4 --- /dev/null +++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output @@ -0,0 +1,66 @@ +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:6|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:4|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:6|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_min:-10|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:5|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:0|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:-3|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:-4|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:5|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:10|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:10|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:12|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:4|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:3|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:12|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:12|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:12|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:3|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:7|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:7|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_min:-3|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:-1|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:-2|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:-2|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:-1|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:1|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:0.5|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:0.5|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:1|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store \ No newline at end of file