From 25a0f58d793630e9e4fc94c44a04928afe6b2edb Mon Sep 17 00:00:00 2001 From: David Heryanto Date: Tue, 25 Feb 2020 15:45:40 +0800 Subject: [PATCH] Extend WriteMetricsTransform in Ingestion to write feature value stats to StatsD (#486) * Extend WriteMetricsTransform to write feature value stats to StatsD * Apply mvn spotless * Catch all exception not just StatsDClientException during init Since there are other exception like UnknownHostException that can be thrown and we want to know such error. Also change the log level to error because so it's not normal for client to fail to be created" * Change log level due to invalid feature set ref to error (previously warn) On 2nd thought, this should constitute an error not a warning * Apply maven spotless to metric transform codes (cherry picked from commit 5508c9230c7359ceb761c0b71ac9924e0423fbb4) --- .prow/scripts/test-end-to-end.sh | 4 + ingestion/pom.xml | 7 + .../ingestion/options/ImportOptions.java | 10 + .../metrics/WriteFeatureValueMetricsDoFn.java | 311 +++++++++++++++++ .../metrics/WriteMetricsTransform.java | 41 +++ .../metrics/WriteRowMetricsDoFn.java | 14 +- .../WriteFeatureValueMetricsDoFnTest.java | 315 ++++++++++++++++++ .../WriteFeatureValueMetricsDoFnTest.README | 9 + .../WriteFeatureValueMetricsDoFnTest.input | 4 + .../WriteFeatureValueMetricsDoFnTest.output | 66 ++++ 10 files changed, 774 insertions(+), 7 deletions(-) create mode 100644 ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java create mode 100644 ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java create mode 100644 ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README create mode 100644 ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input create mode 100644 ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output diff --git a/.prow/scripts/test-end-to-end.sh b/.prow/scripts/test-end-to-end.sh index 45cb9c82ae..b9d7fa9088 100755 --- a/.prow/scripts/test-end-to-end.sh +++ b/.prow/scripts/test-end-to-end.sh @@ -66,6 +66,7 @@ sleep 20 tail -n10 /var/log/kafka.log kafkacat -b localhost:9092 -L +if [[ ${SKIP_BUILD_JARS} != "true" ]]; then echo " ============================================================ Building jars for Feast @@ -81,6 +82,9 @@ mvn --quiet --batch-mode --define skipTests=true clean package ls -lh core/target/*jar ls -lh serving/target/*jar +else + echo "[DEBUG] Skipping building jars" +fi echo " ============================================================ diff --git a/ingestion/pom.xml b/ingestion/pom.xml index c829674a64..001da1a145 100644 --- a/ingestion/pom.xml +++ b/ingestion/pom.xml @@ -248,5 +248,12 @@ 2.8.1 + + + org.apache.commons + commons-math3 + 3.6.1 + + diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index 6afdd80dd7..c1bdcd5fd1 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -26,6 +26,7 @@ /** Options passed to Beam to influence the job's execution environment */ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions { + @Required @Description( "JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format." @@ -83,4 +84,13 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, int getStatsdPort(); void setStatsdPort(int StatsdPort); + + @Description( + "Fixed window size in seconds (default 30) to apply before aggregation of numerical value of features" + + "and writing the aggregated value to StatsD. Refer to feast.ingestion.transform.metrics.WriteFeatureValueMetricsDoFn" + + "for details on the metric names and types.") + @Default.Integer(30) + int getWindowSizeInSecForFeatureValueMetric(); + + void setWindowSizeInSecForFeatureValueMetric(int seconds); } diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java new file mode 100644 index 0000000000..8574d2414c --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java @@ -0,0 +1,311 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.metrics; + +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_NAME_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_PROJECT_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_VERSION_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.INGESTION_JOB_NAME_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.METRIC_PREFIX; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.STORE_TAG_KEY; + +import com.google.auto.value.AutoValue; +import com.timgroup.statsd.NonBlockingStatsDClient; +import com.timgroup.statsd.StatsDClient; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.Value; +import java.util.ArrayList; +import java.util.DoubleSummaryStatistics; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.commons.math3.stat.descriptive.rank.Percentile; +import org.slf4j.Logger; + +/** + * WriteFeatureValueMetricsDoFn accepts key value of FeatureSetRef(str) to FeatureRow(List) and + * writes a histogram of the numerical values of each feature to StatsD. + * + *

The histogram of the numerical values is represented as the following in StatsD: + * + *

+ * + *

StatsD timing/histogram metric type is not used since it does not support negative values. + */ +@AutoValue +public abstract class WriteFeatureValueMetricsDoFn + extends DoFn>, Void> { + + abstract String getStoreName(); + + abstract String getStatsdHost(); + + abstract int getStatsdPort(); + + static Builder newBuilder() { + return new AutoValue_WriteFeatureValueMetricsDoFn.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setStoreName(String storeName); + + abstract Builder setStatsdHost(String statsdHost); + + abstract Builder setStatsdPort(int statsdPort); + + abstract WriteFeatureValueMetricsDoFn build(); + } + + private static final Logger log = + org.slf4j.LoggerFactory.getLogger(WriteFeatureValueMetricsDoFn.class); + private StatsDClient statsDClient; + public static String GAUGE_NAME_FEATURE_VALUE_MIN = "feature_value_min"; + public static String GAUGE_NAME_FEATURE_VALUE_MAX = "feature_value_max"; + public static String GAUGE_NAME_FEATURE_VALUE_MEAN = "feature_value_mean"; + public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50 = "feature_value_percentile_50"; + public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90 = "feature_value_percentile_90"; + public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95 = "feature_value_percentile_95"; + + @Setup + public void setup() { + // Note that exception may be thrown during StatsD client instantiation but no exception + // will be thrown when sending metrics (mimicking the UDP protocol behaviour). + // https://jar-download.com/artifacts/com.datadoghq/java-dogstatsd-client/2.1.1/documentation + // https://github.com/DataDog/java-dogstatsd-client#unix-domain-socket-support + try { + statsDClient = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort()); + } catch (Exception e) { + log.error("StatsD client cannot be started: " + e.getMessage()); + } + } + + @Teardown + public void tearDown() { + if (statsDClient != null) { + statsDClient.close(); + } + } + + @ProcessElement + public void processElement( + ProcessContext context, + @Element KV> featureSetRefToFeatureRows) { + if (statsDClient == null) { + return; + } + + String featureSetRef = featureSetRefToFeatureRows.getKey(); + if (featureSetRef == null) { + return; + } + String[] colonSplits = featureSetRef.split(":"); + if (colonSplits.length != 2) { + log.error( + "Skip writing feature value metrics because the feature set reference '{}' does not" + + "follow the required format /:", + featureSetRef); + return; + } + String[] slashSplits = colonSplits[0].split("/"); + if (slashSplits.length != 2) { + log.error( + "Skip writing feature value metrics because the feature set reference '{}' does not" + + "follow the required format /:", + featureSetRef); + return; + } + String projectName = slashSplits[0]; + String featureSetName = slashSplits[1]; + String version = colonSplits[1]; + + Map featureNameToStats = new HashMap<>(); + Map> featureNameToValues = new HashMap<>(); + for (FeatureRow featureRow : featureSetRefToFeatureRows.getValue()) { + for (Field field : featureRow.getFieldsList()) { + updateStats(featureNameToStats, featureNameToValues, field); + } + } + + for (Entry entry : featureNameToStats.entrySet()) { + String featureName = entry.getKey(); + DoubleSummaryStatistics stats = entry.getValue(); + String[] tags = { + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_PROJECT_TAG_KEY + ":" + projectName, + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, + FEATURE_SET_VERSION_TAG_KEY + ":" + version, + FEATURE_TAG_KEY + ":" + featureName, + INGESTION_JOB_NAME_KEY + ":" + context.getPipelineOptions().getJobName() + }; + + // stats can return non finite values when there is no element + // or there is an element that is not a number. Metric should only be sent for finite values. + if (Double.isFinite(stats.getMin())) { + if (stats.getMin() < 0) { + // StatsD gauge will asssign a delta instead of the actual value, if there is a sign in + // the value. E.g. if the value is negative, a delta will be assigned. For this reason, + // the gauge value is set to zero beforehand. + // https://github.com/statsd/statsd/blob/master/docs/metric_types.md#gauges + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MIN, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MIN, stats.getMin(), tags); + } + if (Double.isFinite(stats.getMax())) { + if (stats.getMax() < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MAX, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MAX, stats.getMax(), tags); + } + if (Double.isFinite(stats.getAverage())) { + if (stats.getAverage() < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MEAN, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MEAN, stats.getAverage(), tags); + } + + // For percentile calculation, Percentile class from commons-math3 from Apache is used. + // Percentile requires double[], hence the conversion below. + if (!featureNameToValues.containsKey(featureName)) { + continue; + } + List valueList = featureNameToValues.get(featureName); + if (valueList == null || valueList.size() < 1) { + continue; + } + double[] values = new double[valueList.size()]; + for (int i = 0; i < values.length; i++) { + values[i] = valueList.get(i); + } + + double p50 = new Percentile().evaluate(values, 50); + if (p50 < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50, p50, tags); + + double p90 = new Percentile().evaluate(values, 90); + if (p90 < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90, p90, tags); + + double p95 = new Percentile().evaluate(values, 95); + if (p95 < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95, p95, tags); + } + } + + // Update stats and values array for the feature represented by the field. + // If the field contains non-numerical or non-boolean value, the stats and values array + // won't get updated because we are only concerned with numerical value in metrics data. + // For boolean value, true and false are treated as numerical value of 1 of 0 respectively. + private void updateStats( + Map featureNameToStats, + Map> featureNameToValues, + Field field) { + if (featureNameToStats == null || featureNameToValues == null || field == null) { + return; + } + + String featureName = field.getName(); + if (!featureNameToStats.containsKey(featureName)) { + featureNameToStats.put(featureName, new DoubleSummaryStatistics()); + } + if (!featureNameToValues.containsKey(featureName)) { + featureNameToValues.put(featureName, new ArrayList<>()); + } + + Value value = field.getValue(); + DoubleSummaryStatistics stats = featureNameToStats.get(featureName); + List values = featureNameToValues.get(featureName); + + switch (value.getValCase()) { + case INT32_VAL: + stats.accept(value.getInt32Val()); + values.add(((double) value.getInt32Val())); + break; + case INT64_VAL: + stats.accept(value.getInt64Val()); + values.add((double) value.getInt64Val()); + break; + case DOUBLE_VAL: + stats.accept(value.getDoubleVal()); + values.add(value.getDoubleVal()); + break; + case FLOAT_VAL: + stats.accept(value.getFloatVal()); + values.add((double) value.getFloatVal()); + break; + case BOOL_VAL: + stats.accept(value.getBoolVal() ? 1 : 0); + values.add(value.getBoolVal() ? 1d : 0d); + break; + case INT32_LIST_VAL: + for (Integer val : value.getInt32ListVal().getValList()) { + stats.accept(val); + values.add(((double) val)); + } + break; + case INT64_LIST_VAL: + for (Long val : value.getInt64ListVal().getValList()) { + stats.accept(val); + values.add(((double) val)); + } + break; + case DOUBLE_LIST_VAL: + for (Double val : value.getDoubleListVal().getValList()) { + stats.accept(val); + values.add(val); + } + break; + case FLOAT_LIST_VAL: + for (Float val : value.getFloatListVal().getValList()) { + stats.accept(val); + values.add(((double) val)); + } + break; + case BOOL_LIST_VAL: + for (Boolean val : value.getBoolListVal().getValList()) { + stats.accept(val ? 1 : 0); + values.add(val ? 1d : 0d); + } + break; + case BYTES_VAL: + case BYTES_LIST_VAL: + case STRING_VAL: + case STRING_LIST_VAL: + case VAL_NOT_SET: + default: + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java index 43f314aa86..10322ac812 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java @@ -21,11 +21,16 @@ import feast.ingestion.values.FailedElement; import feast.types.FeatureRowProto.FeatureRow; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; @AutoValue public abstract class WriteMetricsTransform extends PTransform { @@ -79,6 +84,42 @@ public PDone expand(PCollectionTuple input) { .setStoreName(getStoreName()) .build())); + // 1. Apply a fixed window + // 2. Group feature row by feature set reference + // 3. Calculate min, max, mean, percentiles of numerical values of features in the window + // and + // 4. Send the aggregate value to StatsD metric collector. + // + // NOTE: window is applied here so the metric collector will not be overwhelmed with + // metrics data. And for metric data, only statistic of the values are usually required + // vs the actual values. + input + .get(getSuccessTag()) + .apply( + "FixedWindow", + Window.into( + FixedWindows.of( + Duration.standardSeconds( + options.getWindowSizeInSecForFeatureValueMetric())))) + .apply( + "ConvertTo_FeatureSetRefToFeatureRow", + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c, @Element FeatureRow featureRow) { + c.output(KV.of(featureRow.getFeatureSet(), featureRow)); + } + })) + .apply("GroupByFeatureSetRef", GroupByKey.create()) + .apply( + "WriteFeatureValueMetrics", + ParDo.of( + WriteFeatureValueMetricsDoFn.newBuilder() + .setStatsdHost(options.getStatsdHost()) + .setStatsdPort(options.getStatsdPort()) + .setStoreName(getStoreName()) + .build())); + return PDone.in(input.getPipeline()); case "none": default: diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java index db2d1acd6d..2cd1ee94ec 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java @@ -31,13 +31,13 @@ public abstract class WriteRowMetricsDoFn extends DoFn { private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class); - private final String METRIC_PREFIX = "feast_ingestion"; - private final String STORE_TAG_KEY = "feast_store"; - private final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name"; - private final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name"; - private final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version"; - private final String FEATURE_TAG_KEY = "feast_feature_name"; - private final String INGESTION_JOB_NAME_KEY = "ingestion_job_name"; + public static final String METRIC_PREFIX = "feast_ingestion"; + public static final String STORE_TAG_KEY = "feast_store"; + public static final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name"; + public static final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name"; + public static final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version"; + public static final String FEATURE_TAG_KEY = "feast_feature_name"; + public static final String INGESTION_JOB_NAME_KEY = "ingestion_job_name"; public abstract String getStoreName(); diff --git a/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java new file mode 100644 index 0000000000..8f0adf4016 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java @@ -0,0 +1,315 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.metrics; + +import static org.junit.Assert.fail; + +import com.google.protobuf.ByteString; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FeatureRowProto.FeatureRow.Builder; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.BoolList; +import feast.types.ValueProto.BytesList; +import feast.types.ValueProto.DoubleList; +import feast.types.ValueProto.FloatList; +import feast.types.ValueProto.Int32List; +import feast.types.ValueProto.Int64List; +import feast.types.ValueProto.StringList; +import feast.types.ValueProto.Value; +import java.io.BufferedReader; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.junit.Rule; +import org.junit.Test; + +public class WriteFeatureValueMetricsDoFnTest { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private static final int STATSD_SERVER_PORT = 17254; + private final DummyStatsDServer statsDServer = new DummyStatsDServer(STATSD_SERVER_PORT); + + @Test + public void shouldSendCorrectStatsDMetrics() throws IOException, InterruptedException { + PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); + pipelineOptions.setJobName("job"); + + Map> input = + readTestInput("feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input"); + List expectedLines = + readTestOutput("feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output"); + + pipeline + .apply(Create.of(input)) + .apply( + ParDo.of( + WriteFeatureValueMetricsDoFn.newBuilder() + .setStatsdHost("localhost") + .setStatsdPort(STATSD_SERVER_PORT) + .setStoreName("store") + .build())); + pipeline.run(pipelineOptions).waitUntilFinish(); + // Wait until StatsD has finished processed all messages, 3 sec is a reasonable duration + // based on empirical testing. + Thread.sleep(3000); + + List actualLines = statsDServer.messagesReceived(); + for (String expected : expectedLines) { + boolean matched = false; + for (String actual : actualLines) { + if (actual.equals(expected)) { + matched = true; + break; + } + } + if (!matched) { + System.out.println("Print actual metrics output for debugging:"); + for (String line : actualLines) { + System.out.println(line); + } + fail(String.format("Expected StatsD metric not found:\n%s", expected)); + } + } + } + + // Test utility method to read expected StatsD metrics output from a text file. + @SuppressWarnings("SameParameterValue") + private List readTestOutput(String path) throws IOException { + URL url = Thread.currentThread().getContextClassLoader().getResource(path); + if (url == null) { + throw new IllegalArgumentException( + "cannot read test data, path contains null url. Path: " + path); + } + List lines = new ArrayList<>(); + try (BufferedReader reader = Files.newBufferedReader(Paths.get(url.getPath()))) { + String line = reader.readLine(); + while (line != null) { + if (line.trim().length() > 1) { + lines.add(line); + } + line = reader.readLine(); + } + } + return lines; + } + + // Test utility method to create test feature row data from a text file. + @SuppressWarnings("SameParameterValue") + private Map> readTestInput(String path) throws IOException { + Map> data = new HashMap<>(); + URL url = Thread.currentThread().getContextClassLoader().getResource(path); + if (url == null) { + throw new IllegalArgumentException( + "cannot read test data, path contains null url. Path: " + path); + } + List lines = new ArrayList<>(); + try (BufferedReader reader = Files.newBufferedReader(Paths.get(url.getPath()))) { + String line = reader.readLine(); + while (line != null) { + lines.add(line); + line = reader.readLine(); + } + } + List colNames = new ArrayList<>(); + for (String line : lines) { + if (line.strip().length() < 1) { + continue; + } + String[] splits = line.split(","); + colNames.addAll(Arrays.asList(splits)); + + if (line.startsWith("featuresetref")) { + // Header line + colNames.addAll(Arrays.asList(splits).subList(1, splits.length)); + continue; + } + + Builder featureRowBuilder = FeatureRow.newBuilder(); + for (int i = 0; i < splits.length; i++) { + String colVal = splits[i].strip(); + if (i == 0) { + featureRowBuilder.setFeatureSet(colVal); + continue; + } + String colName = colNames.get(i); + Field.Builder fieldBuilder = Field.newBuilder().setName(colName); + if (!colVal.isEmpty()) { + switch (colName) { + case "int32": + fieldBuilder.setValue(Value.newBuilder().setInt32Val((Integer.parseInt(colVal)))); + break; + case "int64": + fieldBuilder.setValue(Value.newBuilder().setInt64Val((Long.parseLong(colVal)))); + break; + case "double": + fieldBuilder.setValue(Value.newBuilder().setDoubleVal((Double.parseDouble(colVal)))); + break; + case "float": + fieldBuilder.setValue(Value.newBuilder().setFloatVal((Float.parseFloat(colVal)))); + break; + case "bool": + fieldBuilder.setValue(Value.newBuilder().setBoolVal((Boolean.parseBoolean(colVal)))); + break; + case "int32list": + List int32List = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + int32List.add(Integer.parseInt(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setInt32ListVal(Int32List.newBuilder().addAllVal(int32List))); + break; + case "int64list": + List int64list = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + int64list.add(Long.parseLong(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setInt64ListVal(Int64List.newBuilder().addAllVal(int64list))); + break; + case "doublelist": + List doubleList = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + doubleList.add(Double.parseDouble(val)); + } + fieldBuilder.setValue( + Value.newBuilder() + .setDoubleListVal(DoubleList.newBuilder().addAllVal(doubleList))); + break; + case "floatlist": + List floatList = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + floatList.add(Float.parseFloat(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setFloatListVal(FloatList.newBuilder().addAllVal(floatList))); + break; + case "boollist": + List boolList = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + boolList.add(Boolean.parseBoolean(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setBoolListVal(BoolList.newBuilder().addAllVal(boolList))); + break; + case "bytes": + fieldBuilder.setValue( + Value.newBuilder().setBytesVal(ByteString.copyFromUtf8("Dummy"))); + break; + case "byteslist": + fieldBuilder.setValue( + Value.newBuilder().setBytesListVal(BytesList.getDefaultInstance())); + break; + case "string": + fieldBuilder.setValue(Value.newBuilder().setStringVal("Dummy")); + break; + case "stringlist": + fieldBuilder.setValue( + Value.newBuilder().setStringListVal(StringList.getDefaultInstance())); + break; + } + } + featureRowBuilder.addFields(fieldBuilder); + } + + if (!data.containsKey(featureRowBuilder.getFeatureSet())) { + data.put(featureRowBuilder.getFeatureSet(), new ArrayList<>()); + } + List featureRowsByFeatureSetRef = data.get(featureRowBuilder.getFeatureSet()); + featureRowsByFeatureSetRef.add(featureRowBuilder.build()); + } + + // Convert List to Iterable to match the function signature in + // WriteFeatureValueMetricsDoFn + Map> dataWithIterable = new HashMap<>(); + for (Entry> entrySet : data.entrySet()) { + String key = entrySet.getKey(); + Iterable value = entrySet.getValue(); + dataWithIterable.put(key, value); + } + return dataWithIterable; + } + + // Modified version of + // https://github.com/tim-group/java-statsd-client/blob/master/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java + @SuppressWarnings("CatchMayIgnoreException") + private static final class DummyStatsDServer { + + private final List messagesReceived = new ArrayList(); + private final DatagramSocket server; + + public DummyStatsDServer(int port) { + try { + server = new DatagramSocket(port); + } catch (SocketException e) { + throw new IllegalStateException(e); + } + new Thread( + () -> { + try { + while (true) { + final DatagramPacket packet = new DatagramPacket(new byte[65535], 65535); + server.receive(packet); + messagesReceived.add( + new String(packet.getData(), StandardCharsets.UTF_8).trim() + "\n"); + Thread.sleep(50); + } + + } catch (Exception e) { + } + }) + .start(); + } + + public void stop() { + server.close(); + } + + public void waitForMessage() { + while (messagesReceived.isEmpty()) { + try { + Thread.sleep(50L); + } catch (InterruptedException e) { + } + } + } + + public List messagesReceived() { + List out = new ArrayList<>(); + for (String msg : messagesReceived) { + String[] lines = msg.split("\n"); + out.addAll(Arrays.asList(lines)); + } + return out; + } + } +} diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README new file mode 100644 index 0000000000..3c8759d170 --- /dev/null +++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README @@ -0,0 +1,9 @@ +WriteFeatureValueMetricsDoFnTest.input file contains data that can be read by test utility +into map of FeatureSetRef -> [FeatureRow]. In the first row, the cell value corresponds to the +field name in the FeatureRow. This should not be changed as the test utility derives the value +type from this name. Empty value in the cell is a value that is not set. For list type, the values +of different element is separated by the '|' character. + +WriteFeatureValueMetricsDoFnTest.output file contains lines of expected StatsD metrics that should +be sent when WriteFeatureValueMetricsDoFn runs. It can be checked against the actual outputted +StatsD metrics to test for correctness. diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input new file mode 100644 index 0000000000..d2985711ce --- /dev/null +++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input @@ -0,0 +1,4 @@ +featuresetref,int32,int64,double,float,bool,int32list,int64list,doublelist,floatlist,boollist,bytes,byteslist,string,stringlist +project/featureset:1,1,5,8,5,true,1|4|3,5|1|12,5|7|3,-2.0,true|false,,,, +project/featureset:1,5,-10,8,10.0,true,1|12|5,,,-1.0|-3.0,false|true,,,, +project/featureset:1,6,-4,8,0.0,true,2,2|5,,,true|false,,,, \ No newline at end of file diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output new file mode 100644 index 0000000000..63bc7bbfa4 --- /dev/null +++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output @@ -0,0 +1,66 @@ +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:6|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:4|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:6|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_min:-10|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:5|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:0|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:-3|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:-4|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:5|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:10|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:10|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:12|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:4|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:3|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:12|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:12|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:12|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:3|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:7|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:7|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_min:-3|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:-1|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:-2|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:-2|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:-1|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:1|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:0.5|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:0.5|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:1|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store \ No newline at end of file