Skip to content

Commit

Permalink
Add Redis storage implementation (#547)
Browse files Browse the repository at this point in the history
* Add Redis storage

* Remove staleness check; can be checked at the service level

* Remove staleness related tests

* Add dependencies to top level pom

* Clean up code
  • Loading branch information
Chen Zhiling committed Apr 7, 2020
1 parent c4844eb commit 8b00ad9
Show file tree
Hide file tree
Showing 12 changed files with 1,772 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.common.retry;

import java.io.Serializable;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.joda.time.Duration;

public class BackOffExecutor implements Serializable {

private final Integer maxRetries;
private final Duration initialBackOff;

public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
this.maxRetries = maxRetries;
this.initialBackOff = initialBackOff;
}

public void execute(Retriable retriable) throws Exception {
FluentBackoff backoff =
FluentBackoff.DEFAULT.withMaxRetries(maxRetries).withInitialBackoff(initialBackOff);
execute(retriable, backoff);
}

private void execute(Retriable retriable, FluentBackoff backoff) throws Exception {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backOff = backoff.backoff();
while (true) {
try {
retriable.execute();
break;
} catch (Exception e) {
if (retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
retriable.cleanUpAfterFailure();
} else {
throw e;
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.common.retry;

public interface Retriable {
void execute() throws Exception;

Boolean isExceptionRetriable(Exception e);

void cleanUpAfterFailure();
}
58 changes: 58 additions & 0 deletions storage/connectors/redis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,64 @@
<name>Feast Storage Connector for Redis</name>

<dependencies>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
<version>1.6.6</version>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>1.6.6</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.23.0</version>
<scope>test</scope>
</dependency>

<!-- To run actual Redis for ingestion integration test -->
<dependency>
<groupId>com.github.kstyrc</groupId>
<artifactId>embedded-redis</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${org.apache.beam.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.redis.retrieval;

import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FieldProto.Field;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class FeatureRowDecoder {

private final String featureSetRef;
private final FeatureSetSpec spec;

public FeatureRowDecoder(String featureSetRef, FeatureSetSpec spec) {
this.featureSetRef = featureSetRef;
this.spec = spec;
}

/**
* A feature row is considered encoded if the feature set and field names are not set. This method
* is required for backward compatibility purposes, to allow Feast serving to continue serving non
* encoded Feature Row ingested by an older version of Feast.
*
* @param featureRow Feature row
* @return boolean
*/
public Boolean isEncoded(FeatureRow featureRow) {
return featureRow.getFeatureSet().isEmpty()
&& featureRow.getFieldsList().stream().allMatch(field -> field.getName().isEmpty());
}

/**
* Validates if an encoded feature row can be decoded without exception.
*
* @param featureRow Feature row
* @return boolean
*/
public Boolean isEncodingValid(FeatureRow featureRow) {
return featureRow.getFieldsList().size() == spec.getFeaturesList().size();
}

/**
* Decoding feature row by repopulating the field names based on the corresponding feature set
* spec.
*
* @param encodedFeatureRow Feature row
* @return boolean
*/
public FeatureRow decode(FeatureRow encodedFeatureRow) {
final List<Field> fieldsWithoutName = encodedFeatureRow.getFieldsList();

List<String> featureNames =
spec.getFeaturesList().stream()
.sorted(Comparator.comparing(FeatureSpec::getName))
.map(FeatureSpec::getName)
.collect(Collectors.toList());
List<Field> fields =
IntStream.range(0, featureNames.size())
.mapToObj(
featureNameIndex -> {
String featureName = featureNames.get(featureNameIndex);
return fieldsWithoutName
.get(featureNameIndex)
.toBuilder()
.setName(featureName)
.build();
})
.collect(Collectors.toList());
return encodedFeatureRow
.toBuilder()
.clearFields()
.setFeatureSet(featureSetRef)
.addAllFields(fields)
.build();
}
}
Loading

0 comments on commit 8b00ad9

Please sign in to comment.