From 96aebdee648b505a5f09ddd2fb54655f1a186efc Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Sun, 23 Feb 2020 00:19:53 +0800 Subject: [PATCH 1/8] Replacing Jedis With Lettuce in ingestion and serving --- ingestion/pom.xml | 4 +- .../java/feast/ingestion/utils/StoreUtil.java | 14 ++-- .../src/main/java/feast/retry/Retriable.java | 2 +- .../redis/LettuceTransactionPipeline.java | 82 +++++++++++++++++++ .../store/serving/redis/RedisCustomIO.java | 46 ++++++----- .../java/feast/ingestion/ImportJobTest.java | 20 +++-- .../serving/redis/RedisCustomIOTest.java | 19 +++-- serving/pom.xml | 6 +- .../configuration/JobServiceConfig.java | 28 ++----- .../configuration/ServingServiceConfig.java | 15 ++-- .../configuration/SpecServiceConfig.java | 1 - .../configuration/StoreConfiguration.java | 47 +++++++++++ .../redis/JobStoreRedisConfig.java | 62 ++++++++++++++ .../redis/ServingStoreRedisConfig.java | 62 ++++++++++++++ .../service/RedisBackedJobService.java | 35 +++----- .../serving/service/RedisServingService.java | 31 ++++--- .../service/RedisBackedJobServiceTest.java | 19 ++--- .../service/RedisServingServiceTest.java | 80 ++++++++++-------- 18 files changed, 416 insertions(+), 157 deletions(-) create mode 100644 ingestion/src/main/java/feast/store/serving/redis/LettuceTransactionPipeline.java create mode 100644 serving/src/main/java/feast/serving/configuration/StoreConfiguration.java create mode 100644 serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java create mode 100644 serving/src/main/java/feast/serving/configuration/redis/ServingStoreRedisConfig.java diff --git a/ingestion/pom.xml b/ingestion/pom.xml index c829674a64..959fdf86ce 100644 --- a/ingestion/pom.xml +++ b/ingestion/pom.xml @@ -216,8 +216,8 @@ - redis.clients - jedis + io.lettuce + lettuce-core diff --git a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java index 7af98fb8f0..a02b862694 100644 --- a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java +++ b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java @@ -43,14 +43,15 @@ import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; import feast.types.ValueProto.ValueType.Enum; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.RedisURI; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.exceptions.JedisConnectionException; // TODO: Create partitioned table by default @@ -239,15 +240,16 @@ public static void setupBigQuery( * @param redisConfig Plase refer to feast.core.Store proto */ public static void checkRedisConnection(RedisConfig redisConfig) { - JedisPool jedisPool = new JedisPool(redisConfig.getHost(), redisConfig.getPort()); + RedisClient redisClient = + RedisClient.create(RedisURI.create(redisConfig.getHost(), redisConfig.getPort())); try { - jedisPool.getResource(); - } catch (JedisConnectionException e) { + redisClient.connect(); + } catch (RedisConnectionException e) { throw new RuntimeException( String.format( "Failed to connect to Redis at host: '%s' port: '%d'. Please check that your Redis is running and accessible from Feast.", redisConfig.getHost(), redisConfig.getPort())); } - jedisPool.close(); + redisClient.shutdown(); } } diff --git a/ingestion/src/main/java/feast/retry/Retriable.java b/ingestion/src/main/java/feast/retry/Retriable.java index 0a788fcdd6..30676fe820 100644 --- a/ingestion/src/main/java/feast/retry/Retriable.java +++ b/ingestion/src/main/java/feast/retry/Retriable.java @@ -17,7 +17,7 @@ package feast.retry; public interface Retriable { - void execute(); + void execute() throws Exception; Boolean isExceptionRetriable(Exception e); diff --git a/ingestion/src/main/java/feast/store/serving/redis/LettuceTransactionPipeline.java b/ingestion/src/main/java/feast/store/serving/redis/LettuceTransactionPipeline.java new file mode 100644 index 0000000000..f451419820 --- /dev/null +++ b/ingestion/src/main/java/feast/store/serving/redis/LettuceTransactionPipeline.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.store.serving.redis; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.TransactionResult; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.codec.ByteArrayCodec; +import java.util.concurrent.ExecutionException; + +public class LettuceTransactionPipeline { + + private StatefulRedisConnection connection; + private RedisAsyncCommands commands; + + protected LettuceTransactionPipeline(RedisClient redisClient) { + connection = redisClient.connect(new ByteArrayCodec()); + this.commands = connection.async(); + } + + public void clear() { + this.commands.discard(); + } + + RedisFuture pexpire(byte[] k, long duration) { + return commands.pexpire(k, duration); + } + + void exec() throws ExecutionException, InterruptedException { + RedisFuture exec = commands.exec(); + exec.get(); + } + + RedisFuture multi() { + return this.commands.multi(); + } + + RedisFuture append(byte[] k, byte[] v) { + return this.commands.append(k, v); + } + + RedisFuture set(byte[] k, byte[] v) { + return this.commands.set(k, v); + } + + RedisFuture lpush(byte[] k, byte[] v) { + return this.commands.lpush(k, v); + } + + RedisFuture rpush(byte[] k, byte[] v) { + return this.commands.rpush(k, v); + } + + RedisFuture sadd(byte[] k, byte[] v) { + return this.commands.sadd(k, v); + } + + RedisFuture zadd(byte[] k, long s, byte[] v) { + return this.commands.zadd(k, s, v); + } + + void close() { + this.clear(); + this.connection.close(); + } +} diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java index 8c142b66c9..d70793e4ec 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java @@ -20,9 +20,14 @@ import feast.ingestion.values.FailedElement; import feast.retry.BackOffExecutor; import feast.retry.Retriable; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; import org.apache.avro.reflect.Nullable; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; @@ -36,10 +41,6 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.Pipeline; -import redis.clients.jedis.Response; -import redis.clients.jedis.exceptions.JedisConnectionException; public class RedisCustomIO { @@ -194,10 +195,10 @@ public static class WriteDoFn extends DoFn { private final BackOffExecutor backOffExecutor; private final List mutations = new ArrayList<>(); - private Jedis jedis; - private Pipeline pipeline; + private LettuceTransactionPipeline pipeline; private int batchSize = DEFAULT_BATCH_SIZE; private int timeout = DEFAULT_TIMEOUT; + private RedisClient redisclient; WriteDoFn(StoreProto.Store.RedisConfig redisConfig) { this.host = redisConfig.getHost(); @@ -224,20 +225,29 @@ public WriteDoFn withTimeout(int timeout) { @Setup public void setup() { - jedis = new Jedis(host, port, timeout); + this.redisclient = + RedisClient.create(new RedisURI(host, port, java.time.Duration.ofMillis(timeout))); } @StartBundle public void startBundle() { + try { + pipeline = new LettuceTransactionPipeline(redisclient); + } catch (RedisConnectionException e) { + log.error("Connection to redis cannot be established ", e); + } mutations.clear(); - pipeline = jedis.pipelined(); } private void executeBatch() throws Exception { backOffExecutor.execute( new Retriable() { @Override - public void execute() { + public void execute() throws ExecutionException, InterruptedException { + if (pipeline == null) { + pipeline = new LettuceTransactionPipeline(redisclient); + } + pipeline.clear(); pipeline.multi(); mutations.forEach( mutation -> { @@ -247,24 +257,22 @@ public void execute() { } }); pipeline.exec(); - pipeline.sync(); + pipeline.clear(); mutations.clear(); } @Override public Boolean isExceptionRetriable(Exception e) { - return e instanceof JedisConnectionException; + return e instanceof RedisConnectionException + || e instanceof ExecutionException + || e instanceof InterruptedException; } @Override public void cleanUpAfterFailure() { - try { - pipeline.close(); - } catch (IOException e) { - log.error(String.format("Error while closing pipeline: %s", e.getMessage())); + if (pipeline != null) { + pipeline.clear(); } - jedis = new Jedis(host, port, timeout); - pipeline = jedis.pipelined(); } }); } @@ -299,7 +307,7 @@ public void processElement(ProcessContext context) { } } - private Response writeRecord(RedisMutation mutation) { + private RedisFuture writeRecord(RedisMutation mutation) { switch (mutation.getMethod()) { case APPEND: return pipeline.append(mutation.getKey(), mutation.getValue()); @@ -339,7 +347,7 @@ public void finishBundle(FinishBundleContext context) @Teardown public void teardown() { - jedis.close(); + redisclient.shutdown(); } } } diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 1148fa4042..7546d7e36e 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -38,6 +38,11 @@ import feast.test.TestUtil.LocalRedis; import feast.types.FeatureRowProto.FeatureRow; import feast.types.ValueProto.ValueType.Enum; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.codec.ByteArrayCodec; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -57,7 +62,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import redis.clients.jedis.Jedis; public class ImportJobTest { @@ -206,21 +210,24 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() Duration.standardSeconds(IMPORT_JOB_CHECK_INTERVAL_DURATION_SEC)); LOGGER.info("Validating the actual values written to Redis ..."); - Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT); + RedisClient redisClient = + RedisClient.create(new RedisURI(REDIS_HOST, REDIS_PORT, java.time.Duration.ofMillis(2000))); + StatefulRedisConnection connection = redisClient.connect(new ByteArrayCodec()); + RedisCommands sync = connection.sync(); expected.forEach( (key, expectedValue) -> { // Ensure ingested key exists. - byte[] actualByteValue = jedis.get(key.toByteArray()); + byte[] actualByteValue = sync.get(key.toByteArray()); if (actualByteValue == null) { LOGGER.error("Key not found in Redis: " + key); LOGGER.info("Redis INFO:"); - LOGGER.info(jedis.info()); - String randomKey = jedis.randomKey(); + LOGGER.info(sync.info()); + byte[] randomKey = sync.randomkey(); if (randomKey != null) { LOGGER.info("Sample random key, value (for debugging purpose):"); LOGGER.info("Key: " + randomKey); - LOGGER.info("Value: " + jedis.get(randomKey)); + LOGGER.info("Value: " + sync.get(randomKey)); } Assert.fail("Missing key in Redis."); } @@ -239,5 +246,6 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() // Ensure the retrieved FeatureRow is equal to the ingested FeatureRow. Assert.assertEquals(expectedValue, actualValue); }); + redisClient.shutdown(); } } diff --git a/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java b/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java index fc17f6207f..d2673a481f 100644 --- a/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java +++ b/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java @@ -26,6 +26,11 @@ import feast.store.serving.redis.RedisCustomIO.RedisMutation; import feast.types.FeatureRowProto.FeatureRow; import feast.types.ValueProto.ValueType.Enum; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisStringCommands; +import io.lettuce.core.codec.ByteArrayCodec; import java.io.IOException; import java.util.HashMap; import java.util.LinkedHashMap; @@ -43,7 +48,6 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import redis.clients.jedis.Jedis; import redis.embedded.Redis; import redis.embedded.RedisServer; @@ -53,17 +57,22 @@ public class RedisCustomIOTest { private static String REDIS_HOST = "localhost"; private static int REDIS_PORT = 51234; private Redis redis; - private Jedis jedis; + private RedisClient redisClient; + private RedisStringCommands sync; @Before public void setUp() throws IOException { redis = new RedisServer(REDIS_PORT); redis.start(); - jedis = new Jedis(REDIS_HOST, REDIS_PORT); + redisClient = + RedisClient.create(new RedisURI(REDIS_HOST, REDIS_PORT, java.time.Duration.ofMillis(2000))); + StatefulRedisConnection connection = redisClient.connect(new ByteArrayCodec()); + sync = connection.sync(); } @After public void teardown() { + redisClient.shutdown(); redis.stop(); } @@ -110,7 +119,7 @@ public void shouldWriteToRedis() { kvs.forEach( (key, value) -> { - byte[] actual = jedis.get(key.toByteArray()); + byte[] actual = sync.get(key.toByteArray()); assertThat(actual, equalTo(value.toByteArray())); }); } @@ -169,7 +178,7 @@ public void shouldRetryFailConnection() throws InterruptedException { kvs.forEach( (key, value) -> { - byte[] actual = jedis.get(key.toByteArray()); + byte[] actual = sync.get(key.toByteArray()); assertThat(actual, equalTo(value.toByteArray())); }); } diff --git a/serving/pom.xml b/serving/pom.xml index be573be45c..17700a351b 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -138,11 +138,11 @@ 3.1.0 - - redis.clients - jedis + io.lettuce + lettuce-core + com.google.guava diff --git a/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java b/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java index 4c6b652c46..fa94dab832 100644 --- a/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java @@ -22,42 +22,24 @@ import feast.serving.service.NoopJobService; import feast.serving.service.RedisBackedJobService; import feast.serving.specs.CachedSpecService; -import java.util.Map; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; @Configuration public class JobServiceConfig { - public static final String DEFAULT_REDIS_MAX_CONN = "8"; - public static final String DEFAULT_REDIS_MAX_IDLE = "8"; - public static final String DEFAULT_REDIS_MAX_WAIT_MILLIS = "50"; - @Bean - public JobService jobService(FeastProperties feastProperties, CachedSpecService specService) { + public JobService jobService( + FeastProperties feastProperties, + CachedSpecService specService, + StoreConfiguration storeConfiguration) { if (!specService.getStore().getType().equals(StoreType.BIGQUERY)) { return new NoopJobService(); } StoreType storeType = StoreType.valueOf(feastProperties.getJobs().getStoreType()); - Map storeOptions = feastProperties.getJobs().getStoreOptions(); switch (storeType) { case REDIS: - JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); - jedisPoolConfig.setMaxTotal( - Integer.parseInt(storeOptions.getOrDefault("max-conn", DEFAULT_REDIS_MAX_CONN))); - jedisPoolConfig.setMaxIdle( - Integer.parseInt(storeOptions.getOrDefault("max-idle", DEFAULT_REDIS_MAX_IDLE))); - jedisPoolConfig.setMaxWaitMillis( - Integer.parseInt( - storeOptions.getOrDefault("max-wait-millis", DEFAULT_REDIS_MAX_WAIT_MILLIS))); - JedisPool jedisPool = - new JedisPool( - jedisPoolConfig, - storeOptions.get("host"), - Integer.parseInt(storeOptions.get("port"))); - return new RedisBackedJobService(jedisPool); + return new RedisBackedJobService(storeConfiguration.getJobStoreRedisConnection()); case INVALID: case BIGQUERY: case CASSANDRA: diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java index 3cc115978a..d0ea058baf 100644 --- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java @@ -36,8 +36,6 @@ import org.slf4j.Logger; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; @Configuration public class ServingServiceConfig { @@ -74,19 +72,16 @@ public ServingService servingService( FeastProperties feastProperties, CachedSpecService specService, JobService jobService, - Tracer tracer) { + Tracer tracer, + StoreConfiguration storeConfiguration) { ServingService servingService = null; Store store = specService.getStore(); switch (store.getType()) { case REDIS: - RedisConfig redisConfig = store.getRedisConfig(); - JedisPoolConfig poolConfig = new JedisPoolConfig(); - poolConfig.setMaxTotal(feastProperties.getStore().getRedisPoolMaxSize()); - poolConfig.setMaxIdle(feastProperties.getStore().getRedisPoolMaxIdle()); - JedisPool jedisPool = - new JedisPool(poolConfig, redisConfig.getHost(), redisConfig.getPort()); - servingService = new RedisServingService(jedisPool, specService, tracer); + servingService = + new RedisServingService( + storeConfiguration.getServingRedisConnection(), specService, tracer); break; case BIGQUERY: BigQueryConfig bqConfig = store.getBigqueryConfig(); diff --git a/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java b/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java index 0b3a2938b8..26ebfa956c 100644 --- a/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java @@ -59,7 +59,6 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService( @Bean public CachedSpecService specService(FeastProperties feastProperties) { - CoreSpecService coreService = new CoreSpecService(feastCoreHost, feastCorePort); Path path = Paths.get(feastProperties.getStore().getConfigPath()); CachedSpecService cachedSpecStorage = new CachedSpecService(coreService, path); diff --git a/serving/src/main/java/feast/serving/configuration/StoreConfiguration.java b/serving/src/main/java/feast/serving/configuration/StoreConfiguration.java new file mode 100644 index 0000000000..84dc7b7f8d --- /dev/null +++ b/serving/src/main/java/feast/serving/configuration/StoreConfiguration.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.configuration; + +import io.lettuce.core.api.StatefulRedisConnection; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class StoreConfiguration { + + // We can define other store specific beans here + // These beans can be autowired or can be created in this class. + private final StatefulRedisConnection servingRedisConnection; + private final StatefulRedisConnection jobStoreRedisConnection; + + @Autowired + public StoreConfiguration( + ObjectProvider> servingRedisConnection, + ObjectProvider> jobStoreRedisConnection) { + this.servingRedisConnection = servingRedisConnection.getIfAvailable(); + this.jobStoreRedisConnection = jobStoreRedisConnection.getIfAvailable(); + } + + public StatefulRedisConnection getServingRedisConnection() { + return servingRedisConnection; + } + + public StatefulRedisConnection getJobStoreRedisConnection() { + return jobStoreRedisConnection; + } +} diff --git a/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java b/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java new file mode 100644 index 0000000000..e0bce095b3 --- /dev/null +++ b/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.configuration.redis; + +import feast.core.StoreProto; +import feast.serving.FeastProperties; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.codec.ByteArrayCodec; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.core.resource.DefaultClientResources; +import java.util.Map; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class JobStoreRedisConfig { + + @Bean(destroyMethod = "shutdown") + ClientResources jobStoreClientResources() { + return DefaultClientResources.create(); + } + + @Bean(destroyMethod = "shutdown") + RedisClient jobStoreRedisClient( + ClientResources jobStoreClientResources, FeastProperties feastProperties) { + try { + if (StoreProto.Store.StoreType.valueOf(feastProperties.getJobs().getStoreType()) + != StoreProto.Store.StoreType.REDIS) return null; + Map jobStoreConf = feastProperties.getJobs().getStoreOptions(); + RedisURI uri = + RedisURI.create(jobStoreConf.get("host"), Integer.parseInt(jobStoreConf.get("port"))); + return RedisClient.create(jobStoreClientResources, uri); + } catch (Exception e) { + // If the store type is empty or keys are not not properly set. + return null; + } + } + + @Bean(destroyMethod = "close") + StatefulRedisConnection jobStoreRedisConnection( + ObjectProvider jobStoreRedisClient) { + if (jobStoreRedisClient.getIfAvailable() == null) return null; + return jobStoreRedisClient.getIfAvailable().connect(new ByteArrayCodec()); + } +} diff --git a/serving/src/main/java/feast/serving/configuration/redis/ServingStoreRedisConfig.java b/serving/src/main/java/feast/serving/configuration/redis/ServingStoreRedisConfig.java new file mode 100644 index 0000000000..17a50eef6d --- /dev/null +++ b/serving/src/main/java/feast/serving/configuration/redis/ServingStoreRedisConfig.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.configuration.redis; + +import feast.core.StoreProto; +import feast.serving.specs.CachedSpecService; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.codec.ByteArrayCodec; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.core.resource.DefaultClientResources; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.context.annotation.*; + +@Configuration +public class ServingStoreRedisConfig { + + @Bean + StoreProto.Store.RedisConfig servingStoreRedisConf(CachedSpecService specService) { + if (specService.getStore().getType() != StoreProto.Store.StoreType.REDIS) return null; + return specService.getStore().getRedisConfig(); + } + + @Bean(destroyMethod = "shutdown") + ClientResources servingClientResources() { + return DefaultClientResources.create(); + } + + @Bean(destroyMethod = "shutdown") + RedisClient servingRedisClient( + ClientResources servingClientResources, + ObjectProvider servingStoreRedisConf) { + if (servingStoreRedisConf.getIfAvailable() == null) return null; + RedisURI redisURI = + RedisURI.create( + servingStoreRedisConf.getIfAvailable().getHost(), + servingStoreRedisConf.getIfAvailable().getPort()); + return RedisClient.create(servingClientResources, redisURI); + } + + @Bean(destroyMethod = "close") + StatefulRedisConnection servingRedisConnection( + ObjectProvider servingRedisClient) { + if (servingRedisClient.getIfAvailable() == null) return null; + return servingRedisClient.getIfAvailable().connect(new ByteArrayCodec()); + } +} diff --git a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java index 230e20cd78..0bf5363037 100644 --- a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java +++ b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java @@ -19,12 +19,11 @@ import com.google.protobuf.util.JsonFormat; import feast.serving.ServingAPIProto.Job; import feast.serving.ServingAPIProto.Job.Builder; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; import java.util.Optional; import org.joda.time.Duration; import org.slf4j.Logger; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.exceptions.JedisConnectionException; // TODO: Do rate limiting, currently if clients call get() or upsert() // and an exceedingly high rate e.g. they wrap job reload in a while loop with almost no wait @@ -33,53 +32,41 @@ public class RedisBackedJobService implements JobService { private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedisBackedJobService.class); - private final JedisPool jedisPool; + private final RedisCommands syncCommand; // Remove job state info after "defaultExpirySeconds" to prevent filling up Redis memory // and since users normally don't require info about relatively old jobs. private final int defaultExpirySeconds = (int) Duration.standardDays(1).getStandardSeconds(); - public RedisBackedJobService(JedisPool jedisPool) { - this.jedisPool = jedisPool; + public RedisBackedJobService(StatefulRedisConnection connection) { + this.syncCommand = connection.sync(); } @Override public Optional get(String id) { - Jedis jedis = null; Job job = null; try { - jedis = jedisPool.getResource(); - String json = jedis.get(id); - if (json == null) { + String json = new String(syncCommand.get(id.getBytes())); + if (json.isEmpty()) { return Optional.empty(); } Builder builder = Job.newBuilder(); JsonFormat.parser().merge(json, builder); job = builder.build(); - } catch (JedisConnectionException e) { - log.error(String.format("Failed to connect to the redis instance: %s", e)); } catch (Exception e) { log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage())); - } finally { - if (jedis != null) { - jedis.close(); - } } return Optional.ofNullable(job); } @Override public void upsert(Job job) { - Jedis jedis = null; try { - jedis = jedisPool.getResource(); - jedis.set(job.getId(), JsonFormat.printer().omittingInsignificantWhitespace().print(job)); - jedis.expire(job.getId(), defaultExpirySeconds); + syncCommand.set( + job.getId().getBytes(), + JsonFormat.printer().omittingInsignificantWhitespace().print(job).getBytes()); + syncCommand.expire(job.getId().getBytes(), defaultExpirySeconds); } catch (Exception e) { log.error(String.format("Failed to upsert job: %s", e.getMessage())); - } finally { - if (jedis != null) { - jedis.close(); - } } } } diff --git a/serving/src/main/java/feast/serving/service/RedisServingService.java b/serving/src/main/java/feast/serving/service/RedisServingService.java index 48fc485214..a5ec73b136 100644 --- a/serving/src/main/java/feast/serving/service/RedisServingService.java +++ b/serving/src/main/java/feast/serving/service/RedisServingService.java @@ -49,24 +49,27 @@ import feast.types.FieldProto.Field; import feast.types.ValueProto.Value; import io.grpc.Status; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; import io.opentracing.Scope; import io.opentracing.Tracer; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.slf4j.Logger; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; public class RedisServingService implements ServingService { private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedisServingService.class); - private final JedisPool jedisPool; private final CachedSpecService specService; private final Tracer tracer; + private final RedisCommands syncCommands; - public RedisServingService(JedisPool jedisPool, CachedSpecService specService, Tracer tracer) { - this.jedisPool = jedisPool; + public RedisServingService( + StatefulRedisConnection connection, + CachedSpecService specService, + Tracer tracer) { + this.syncCommands = connection.sync(); this.specService = specService; this.tracer = tracer; } @@ -194,7 +197,7 @@ private void sendAndProcessMultiGet( FeatureSetRequest featureSetRequest) throws InvalidProtocolBufferException { - List jedisResps = sendMultiGet(redisKeys); + List values = sendMultiGet(redisKeys); long startTime = System.currentTimeMillis(); try (Scope scope = tracer.buildSpan("Redis-processResponse").startActive(true)) { FeatureSetSpec spec = featureSetRequest.getSpec(); @@ -206,12 +209,12 @@ private void sendAndProcessMultiGet( RefUtil::generateFeatureStringRef, featureReference -> Value.newBuilder().build())); - for (int i = 0; i < jedisResps.size(); i++) { + for (int i = 0; i < values.size(); i++) { EntityRow entityRow = entityRows.get(i); Map featureValues = featureValuesMap.get(entityRow); - byte[] jedisResponse = jedisResps.get(i); - if (jedisResponse == null) { + byte[] value = values.get(i); + if (value == null) { featureSetRequest .getFeatureReferences() .parallelStream() @@ -226,7 +229,7 @@ private void sendAndProcessMultiGet( continue; } - FeatureRow featureRow = FeatureRow.parseFrom(jedisResponse); + FeatureRow featureRow = FeatureRow.parseFrom(value); boolean stale = isStale(featureSetRequest, entityRow, featureRow); if (stale) { @@ -298,13 +301,15 @@ private boolean isStale( private List sendMultiGet(List keys) { try (Scope scope = tracer.buildSpan("Redis-sendMultiGet").startActive(true)) { long startTime = System.currentTimeMillis(); - try (Jedis jedis = jedisPool.getResource()) { + try { byte[][] binaryKeys = keys.stream() .map(AbstractMessageLite::toByteArray) .collect(Collectors.toList()) .toArray(new byte[0][0]); - return jedis.mget(binaryKeys); + return syncCommands.mget(binaryKeys).stream() + .map(io.lettuce.core.Value::getValue) + .collect(Collectors.toList()); } catch (Exception e) { throw Status.NOT_FOUND .withDescription("Unable to retrieve feature from Redis") @@ -313,7 +318,7 @@ private List sendMultiGet(List keys) { } finally { requestLatency .labels("sendMultiGet") - .observe((System.currentTimeMillis() - startTime) / 1000); + .observe((System.currentTimeMillis() - startTime) / 1000.0); } } } diff --git a/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java index 9247375f59..34bc31d2c2 100644 --- a/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java +++ b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java @@ -16,17 +16,17 @@ */ package feast.serving.service; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.codec.ByteArrayCodec; import java.io.IOException; import org.junit.After; import org.junit.Before; import org.junit.Test; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; import redis.embedded.RedisServer; public class RedisBackedJobServiceTest { - private static String REDIS_HOST = "localhost"; - private static int REDIS_PORT = 51235; + private static Integer REDIS_PORT = 51235; private RedisServer redis; @Before @@ -41,12 +41,10 @@ public void teardown() { } @Test - public void shouldRecoverIfRedisConnectionIsLost() { - JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); - jedisPoolConfig.setMaxTotal(1); - jedisPoolConfig.setMaxWaitMillis(10); - JedisPool jedisPool = new JedisPool(jedisPoolConfig, REDIS_HOST, REDIS_PORT); - RedisBackedJobService jobService = new RedisBackedJobService(jedisPool); + public void shouldRecoverIfRedisConnectionIsLost() throws IOException { + RedisClient client = RedisClient.create(RedisURI.create("localhost", REDIS_PORT)); + RedisBackedJobService jobService = + new RedisBackedJobService(client.connect(new ByteArrayCodec())); jobService.get("does not exist"); redis.stop(); try { @@ -56,5 +54,6 @@ public void shouldRecoverIfRedisConnectionIsLost() { } redis.start(); jobService.get("does not exist"); + client.shutdown(); } } diff --git a/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java b/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java index 042107e117..8446218cff 100644 --- a/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java +++ b/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java @@ -38,38 +38,37 @@ import feast.types.FeatureRowProto.FeatureRow; import feast.types.FieldProto.Field; import feast.types.ValueProto.Value; +import io.lettuce.core.KeyValue; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; import io.opentracing.Tracer; import io.opentracing.Tracer.SpanBuilder; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Mockito; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; public class RedisServingServiceTest { - @Mock JedisPool jedisPool; - - @Mock Jedis jedis; - @Mock CachedSpecService specService; @Mock Tracer tracer; + @Mock StatefulRedisConnection connection; + + @Mock RedisCommands syncCommands; + private RedisServingService redisServingService; private byte[][] redisKeyList; @Before public void setUp() { initMocks(this); - - redisServingService = new RedisServingService(jedisPool, specService, tracer); + when(connection.sync()).thenReturn(syncCommands); + redisServingService = new RedisServingService(connection, specService, tracer); redisKeyList = Lists.newArrayList( RedisKey.newBuilder() @@ -149,12 +148,14 @@ public void shouldReturnResponseWithValuesIfKeysPresent() { .setSpec(getFeatureSetSpec()) .build(); - List featureRowBytes = - featureRows.stream().map(AbstractMessageLite::toByteArray).collect(Collectors.toList()); + List> featureRowBytes = + featureRows.stream() + .map(x -> KeyValue.from(new byte[1], Optional.of(x.toByteArray()))) + .collect(Collectors.toList()); when(specService.getFeatureSets(request.getFeaturesList())) .thenReturn(Collections.singletonList(featureSetRequest)); - when(jedisPool.getResource()).thenReturn(jedis); - when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); + when(connection.sync()).thenReturn(syncCommands); + when(syncCommands.mget(redisKeyList)).thenReturn(featureRowBytes); when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); GetOnlineFeaturesResponse expected = @@ -234,12 +235,14 @@ public void shouldReturnResponseWithValuesWhenFeatureSetSpecHasUnspecifiedMaxAge .setSpec(getFeatureSetSpecWithNoMaxAge()) .build(); - List featureRowBytes = - featureRows.stream().map(AbstractMessageLite::toByteArray).collect(Collectors.toList()); + List> featureRowBytes = + featureRows.stream() + .map(x -> KeyValue.from(new byte[1], Optional.of(x.toByteArray()))) + .collect(Collectors.toList()); when(specService.getFeatureSets(request.getFeaturesList())) .thenReturn(Collections.singletonList(featureSetRequest)); - when(jedisPool.getResource()).thenReturn(jedis); - when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); + when(connection.sync()).thenReturn(syncCommands); + when(syncCommands.mget(redisKeyList)).thenReturn(featureRowBytes); when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); GetOnlineFeaturesResponse expected = @@ -315,12 +318,14 @@ public void shouldReturnKeysWithoutVersionifNotProvided() { .setSpec(getFeatureSetSpec()) .build(); - List featureRowBytes = - featureRows.stream().map(AbstractMessageLite::toByteArray).collect(Collectors.toList()); + List> featureRowBytes = + featureRows.stream() + .map(x -> KeyValue.from(new byte[1], Optional.of(x.toByteArray()))) + .collect(Collectors.toList()); when(specService.getFeatureSets(request.getFeaturesList())) .thenReturn(Collections.singletonList(featureSetRequest)); - when(jedisPool.getResource()).thenReturn(jedis); - when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); + when(connection.sync()).thenReturn(syncCommands); + when(syncCommands.mget(redisKeyList)).thenReturn(featureRowBytes); when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); GetOnlineFeaturesResponse expected = @@ -401,11 +406,14 @@ public void shouldReturnResponseWithUnsetValuesIfKeysNotPresent() { .setSpec(getFeatureSetSpec()) .build(); - List featureRowBytes = Lists.newArrayList(featureRows.get(0).toByteArray(), null); + List> featureRowBytes = + featureRows.stream() + .map(x -> KeyValue.from(new byte[1], Optional.of(x.toByteArray()))) + .collect(Collectors.toList()); when(specService.getFeatureSets(request.getFeaturesList())) .thenReturn(Collections.singletonList(featureSetRequest)); - when(jedisPool.getResource()).thenReturn(jedis); - when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); + when(connection.sync()).thenReturn(syncCommands); + when(syncCommands.mget(redisKeyList)).thenReturn(featureRowBytes); when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); GetOnlineFeaturesResponse expected = @@ -489,12 +497,14 @@ public void shouldReturnResponseWithUnsetValuesIfMaxAgeIsExceeded() { .setSpec(spec) .build(); - List featureRowBytes = - featureRows.stream().map(AbstractMessageLite::toByteArray).collect(Collectors.toList()); + List> featureRowBytes = + featureRows.stream() + .map(x -> KeyValue.from(new byte[1], Optional.of(x.toByteArray()))) + .collect(Collectors.toList()); when(specService.getFeatureSets(request.getFeaturesList())) .thenReturn(Collections.singletonList(featureSetRequest)); - when(jedisPool.getResource()).thenReturn(jedis); - when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); + when(connection.sync()).thenReturn(syncCommands); + when(syncCommands.mget(redisKeyList)).thenReturn(featureRowBytes); when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); GetOnlineFeaturesResponse expected = @@ -569,12 +579,14 @@ public void shouldFilterOutUndesiredRows() { .setSpec(getFeatureSetSpec()) .build(); - List featureRowBytes = - featureRows.stream().map(AbstractMessageLite::toByteArray).collect(Collectors.toList()); + List> featureRowBytes = + featureRows.stream() + .map(x -> KeyValue.from(new byte[1], Optional.of(x.toByteArray()))) + .collect(Collectors.toList()); when(specService.getFeatureSets(request.getFeaturesList())) .thenReturn(Collections.singletonList(featureSetRequest)); - when(jedisPool.getResource()).thenReturn(jedis); - when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); + when(connection.sync()).thenReturn(syncCommands); + when(syncCommands.mget(redisKeyList)).thenReturn(featureRowBytes); when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); GetOnlineFeaturesResponse expected = From ccecd6126c10e4c3a141a672ae5e563e288eeed0 Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Tue, 25 Feb 2020 21:00:09 +0800 Subject: [PATCH 2/8] Removing extra lines --- .../main/java/feast/store/serving/redis/RedisCustomIO.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java index bf80f781f7..d70793e4ec 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java @@ -41,10 +41,6 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.Pipeline; -import redis.clients.jedis.Response; -import redis.clients.jedis.exceptions.JedisConnectionException; public class RedisCustomIO { @@ -241,7 +237,6 @@ public void startBundle() { log.error("Connection to redis cannot be established ", e); } mutations.clear(); - pipeline = jedis.pipelined(); } private void executeBatch() throws Exception { @@ -278,8 +273,6 @@ public void cleanUpAfterFailure() { if (pipeline != null) { pipeline.clear(); } - jedis = new Jedis(host, port, timeout); - pipeline = jedis.pipelined(); } }); } From 6a7fb0c18020d6b4698f4f2fe89b824ce4b882d8 Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Wed, 26 Feb 2020 02:11:35 +0800 Subject: [PATCH 3/8] Abstacting redis connection based on store --- .../ingestion/transform/WriteToStore.java | 4 +- .../redis/LettuceTransactionPipeline.java | 82 ------------ .../store/serving/redis/RedisCustomIO.java | 126 ++++++++---------- .../serving/redis/RedisIngestionClient.java | 49 +++++++ .../redis/RedisStandaloneIngestionClient.java | 116 ++++++++++++++++ .../serving/redis/RedisCustomIOTest.java | 21 ++- 6 files changed, 240 insertions(+), 158 deletions(-) delete mode 100644 ingestion/src/main/java/feast/store/serving/redis/LettuceTransactionPipeline.java create mode 100644 ingestion/src/main/java/feast/store/serving/redis/RedisIngestionClient.java create mode 100644 ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java index b7901c2f90..4e9082f555 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java @@ -22,7 +22,6 @@ import feast.core.FeatureSetProto.FeatureSet; import feast.core.StoreProto.Store; import feast.core.StoreProto.Store.BigQueryConfig; -import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; import feast.ingestion.options.ImportOptions; import feast.ingestion.utils.ResourceUtil; @@ -88,13 +87,12 @@ public PDone expand(PCollection input) { switch (storeType) { case REDIS: - RedisConfig redisConfig = getStore().getRedisConfig(); PCollection redisWriteResult = input .apply( "FeatureRowToRedisMutation", ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets()))) - .apply("WriteRedisMutationToRedis", RedisCustomIO.write(redisConfig)); + .apply("WriteRedisMutationToRedis", RedisCustomIO.write(getStore())); if (options.getDeadLetterTableSpec() != null) { redisWriteResult.apply( WriteFailedElementToBigQuery.newBuilder() diff --git a/ingestion/src/main/java/feast/store/serving/redis/LettuceTransactionPipeline.java b/ingestion/src/main/java/feast/store/serving/redis/LettuceTransactionPipeline.java deleted file mode 100644 index f451419820..0000000000 --- a/ingestion/src/main/java/feast/store/serving/redis/LettuceTransactionPipeline.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.store.serving.redis; - -import io.lettuce.core.RedisClient; -import io.lettuce.core.RedisFuture; -import io.lettuce.core.TransactionResult; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.api.async.RedisAsyncCommands; -import io.lettuce.core.codec.ByteArrayCodec; -import java.util.concurrent.ExecutionException; - -public class LettuceTransactionPipeline { - - private StatefulRedisConnection connection; - private RedisAsyncCommands commands; - - protected LettuceTransactionPipeline(RedisClient redisClient) { - connection = redisClient.connect(new ByteArrayCodec()); - this.commands = connection.async(); - } - - public void clear() { - this.commands.discard(); - } - - RedisFuture pexpire(byte[] k, long duration) { - return commands.pexpire(k, duration); - } - - void exec() throws ExecutionException, InterruptedException { - RedisFuture exec = commands.exec(); - exec.get(); - } - - RedisFuture multi() { - return this.commands.multi(); - } - - RedisFuture append(byte[] k, byte[] v) { - return this.commands.append(k, v); - } - - RedisFuture set(byte[] k, byte[] v) { - return this.commands.set(k, v); - } - - RedisFuture lpush(byte[] k, byte[] v) { - return this.commands.lpush(k, v); - } - - RedisFuture rpush(byte[] k, byte[] v) { - return this.commands.rpush(k, v); - } - - RedisFuture sadd(byte[] k, byte[] v) { - return this.commands.sadd(k, v); - } - - RedisFuture zadd(byte[] k, long s, byte[] v) { - return this.commands.zadd(k, s, v); - } - - void close() { - this.clear(); - this.connection.close(); - } -} diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java index d70793e4ec..c9f34032ec 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java @@ -18,14 +18,11 @@ import feast.core.StoreProto; import feast.ingestion.values.FailedElement; -import feast.retry.BackOffExecutor; import feast.retry.Retriable; -import io.lettuce.core.RedisClient; import io.lettuce.core.RedisConnectionException; -import io.lettuce.core.RedisFuture; -import io.lettuce.core.RedisURI; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import org.apache.avro.reflect.Nullable; @@ -37,7 +34,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,8 +47,8 @@ public class RedisCustomIO { private RedisCustomIO() {} - public static Write write(StoreProto.Store.RedisConfig redisConfig) { - return new Write(redisConfig); + public static Write write(StoreProto.Store store) { + return new Write(store); } public enum Method { @@ -169,8 +165,8 @@ public static class Write private WriteDoFn dofn; - private Write(StoreProto.Store.RedisConfig redisConfig) { - this.dofn = new WriteDoFn(redisConfig); + private Write(StoreProto.Store store) { + this.dofn = new WriteDoFn(store); } public Write withBatchSize(int batchSize) { @@ -190,23 +186,14 @@ public PCollection expand(PCollection input) { public static class WriteDoFn extends DoFn { - private final String host; - private final int port; - private final BackOffExecutor backOffExecutor; private final List mutations = new ArrayList<>(); - - private LettuceTransactionPipeline pipeline; private int batchSize = DEFAULT_BATCH_SIZE; private int timeout = DEFAULT_TIMEOUT; - private RedisClient redisclient; - - WriteDoFn(StoreProto.Store.RedisConfig redisConfig) { - this.host = redisConfig.getHost(); - this.port = redisConfig.getPort(); - long backoffMs = - redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1; - this.backOffExecutor = - new BackOffExecutor(redisConfig.getMaxRetries(), Duration.millis(backoffMs)); + private RedisIngestionClient ingestionClient; + + WriteDoFn(StoreProto.Store store) { + if (store.getType() == StoreProto.Store.StoreType.REDIS) + this.ingestionClient = new RedisStandaloneIngestionClient(store.getRedisConfig()); } public WriteDoFn withBatchSize(int batchSize) { @@ -225,14 +212,13 @@ public WriteDoFn withTimeout(int timeout) { @Setup public void setup() { - this.redisclient = - RedisClient.create(new RedisURI(host, port, java.time.Duration.ofMillis(timeout))); + this.ingestionClient.setup(); } @StartBundle public void startBundle() { try { - pipeline = new LettuceTransactionPipeline(redisclient); + ingestionClient.connect(); } catch (RedisConnectionException e) { log.error("Connection to redis cannot be established ", e); } @@ -240,41 +226,35 @@ public void startBundle() { } private void executeBatch() throws Exception { - backOffExecutor.execute( - new Retriable() { - @Override - public void execute() throws ExecutionException, InterruptedException { - if (pipeline == null) { - pipeline = new LettuceTransactionPipeline(redisclient); - } - pipeline.clear(); - pipeline.multi(); - mutations.forEach( - mutation -> { - writeRecord(mutation); - if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) { - pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis()); - } - }); - pipeline.exec(); - pipeline.clear(); - mutations.clear(); - } - - @Override - public Boolean isExceptionRetriable(Exception e) { - return e instanceof RedisConnectionException - || e instanceof ExecutionException - || e instanceof InterruptedException; - } - - @Override - public void cleanUpAfterFailure() { - if (pipeline != null) { - pipeline.clear(); - } - } - }); + this.ingestionClient + .getBackOffExecutor() + .execute( + new Retriable() { + @Override + public void execute() throws ExecutionException, InterruptedException { + if (!ingestionClient.isConnected()) { + ingestionClient.connect(); + } + mutations.forEach( + mutation -> { + writeRecord(mutation); + if (mutation.getExpiryMillis() != null + && mutation.getExpiryMillis() > 0) { + ingestionClient.pexpire(mutation.getKey(), mutation.getExpiryMillis()); + } + }); + ingestionClient.sync(); + mutations.clear(); + } + + @Override + public Boolean isExceptionRetriable(Exception e) { + return e instanceof RedisConnectionException; + } + + @Override + public void cleanUpAfterFailure() {} + }); } private FailedElement toFailedElement( @@ -282,7 +262,7 @@ private FailedElement toFailedElement( return FailedElement.newBuilder() .setJobName(jobName) .setTransformName("RedisCustomIO") - .setPayload(mutation.getValue().toString()) + .setPayload(Arrays.toString(mutation.getValue())) .setErrorMessage(exception.getMessage()) .setStackTrace(ExceptionUtils.getStackTrace(exception)) .build(); @@ -307,20 +287,26 @@ public void processElement(ProcessContext context) { } } - private RedisFuture writeRecord(RedisMutation mutation) { + private void writeRecord(RedisMutation mutation) { switch (mutation.getMethod()) { case APPEND: - return pipeline.append(mutation.getKey(), mutation.getValue()); + ingestionClient.append(mutation.getKey(), mutation.getValue()); + return; case SET: - return pipeline.set(mutation.getKey(), mutation.getValue()); + ingestionClient.set(mutation.getKey(), mutation.getValue()); + return; case LPUSH: - return pipeline.lpush(mutation.getKey(), mutation.getValue()); + ingestionClient.lpush(mutation.getKey(), mutation.getValue()); + return; case RPUSH: - return pipeline.rpush(mutation.getKey(), mutation.getValue()); + ingestionClient.rpush(mutation.getKey(), mutation.getValue()); + return; case SADD: - return pipeline.sadd(mutation.getKey(), mutation.getValue()); + ingestionClient.sadd(mutation.getKey(), mutation.getValue()); + return; case ZADD: - return pipeline.zadd(mutation.getKey(), mutation.getScore(), mutation.getValue()); + ingestionClient.zadd(mutation.getKey(), mutation.getScore(), mutation.getValue()); + return; default: throw new UnsupportedOperationException( String.format("Not implemented writing records for %s", mutation.getMethod())); @@ -347,7 +333,7 @@ public void finishBundle(FinishBundleContext context) @Teardown public void teardown() { - redisclient.shutdown(); + ingestionClient.shutdown(); } } } diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisIngestionClient.java b/ingestion/src/main/java/feast/store/serving/redis/RedisIngestionClient.java new file mode 100644 index 0000000000..d51eead53f --- /dev/null +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisIngestionClient.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.store.serving.redis; + +import feast.retry.BackOffExecutor; +import java.io.Serializable; + +public interface RedisIngestionClient extends Serializable { + + void setup(); + + BackOffExecutor getBackOffExecutor(); + + void shutdown(); + + void connect(); + + boolean isConnected(); + + void sync(); + + void pexpire(byte[] key, Long expiryMillis); + + void append(byte[] key, byte[] value); + + void set(byte[] key, byte[] value); + + void lpush(byte[] key, byte[] value); + + void rpush(byte[] key, byte[] value); + + void sadd(byte[] key, byte[] value); + + void zadd(byte[] key, Long score, byte[] value); +} diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java b/ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java new file mode 100644 index 0000000000..7953f8c101 --- /dev/null +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java @@ -0,0 +1,116 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.store.serving.redis; + +import com.google.common.collect.Lists; +import feast.core.StoreProto; +import feast.retry.BackOffExecutor; +import io.lettuce.core.*; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.codec.ByteArrayCodec; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.joda.time.Duration; + +public class RedisStandaloneIngestionClient implements RedisIngestionClient { + private final String host; + private final int port; + private final BackOffExecutor backOffExecutor; + private RedisClient redisclient; + private static final int DEFAULT_TIMEOUT = 2000; + private StatefulRedisConnection connection; + private RedisAsyncCommands commands; + private List futures = Lists.newArrayList(); + + public RedisStandaloneIngestionClient(StoreProto.Store.RedisConfig redisConfig) { + this.host = redisConfig.getHost(); + this.port = redisConfig.getPort(); + long backoffMs = redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1; + this.backOffExecutor = + new BackOffExecutor(redisConfig.getMaxRetries(), Duration.millis(backoffMs)); + } + + @Override + public void setup() { + this.redisclient = + RedisClient.create(new RedisURI(host, port, java.time.Duration.ofMillis(DEFAULT_TIMEOUT))); + } + + @Override + public BackOffExecutor getBackOffExecutor() { + return this.backOffExecutor; + } + + @Override + public void shutdown() { + this.redisclient.shutdown(); + } + + @Override + public void connect() { + this.connection = this.redisclient.connect(new ByteArrayCodec()); + this.commands = connection.async(); + } + + @Override + public boolean isConnected() { + return connection != null; + } + + @Override + public void sync() { + // Wait for some time for futures to complete + LettuceFutures.awaitAll(60, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0])); + futures.clear(); + } + + @Override + public void pexpire(byte[] key, Long expiryMillis) { + commands.pexpire(key, expiryMillis); + } + + @Override + public void append(byte[] key, byte[] value) { + futures.add(commands.append(key, value)); + } + + @Override + public void set(byte[] key, byte[] value) { + futures.add(commands.set(key, value)); + } + + @Override + public void lpush(byte[] key, byte[] value) { + futures.add(commands.lpush(key, value)); + } + + @Override + public void rpush(byte[] key, byte[] value) { + futures.add(commands.rpush(key, value)); + } + + @Override + public void sadd(byte[] key, byte[] value) { + futures.add(commands.sadd(key, value)); + } + + @Override + public void zadd(byte[] key, Long score, byte[] value) { + futures.add(commands.zadd(key, score, value)); + } +} diff --git a/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java b/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java index d2673a481f..75663d24a6 100644 --- a/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java +++ b/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java @@ -114,7 +114,12 @@ public void shouldWriteToRedis() { null)) .collect(Collectors.toList()); - p.apply(Create.of(featureRowWrites)).apply(RedisCustomIO.write(redisConfig)); + StoreProto.Store store = + StoreProto.Store.newBuilder() + .setRedisConfig(redisConfig) + .setType(StoreProto.Store.StoreType.REDIS) + .build(); + p.apply(Create.of(featureRowWrites)).apply(RedisCustomIO.write(store)); p.run(); kvs.forEach( @@ -157,9 +162,14 @@ public void shouldRetryFailConnection() throws InterruptedException { null)) .collect(Collectors.toList()); + StoreProto.Store store = + StoreProto.Store.newBuilder() + .setRedisConfig(redisConfig) + .setType(StoreProto.Store.StoreType.REDIS) + .build(); PCollection failedElementCount = p.apply(Create.of(featureRowWrites)) - .apply(RedisCustomIO.write(redisConfig)) + .apply(RedisCustomIO.write(store)) .apply(Count.globally()); redis.stop(); @@ -211,9 +221,14 @@ public void shouldProduceFailedElementIfRetryExceeded() { null)) .collect(Collectors.toList()); + StoreProto.Store store = + StoreProto.Store.newBuilder() + .setRedisConfig(redisConfig) + .setType(StoreProto.Store.StoreType.REDIS) + .build(); PCollection failedElementCount = p.apply(Create.of(featureRowWrites)) - .apply(RedisCustomIO.write(redisConfig)) + .apply(RedisCustomIO.write(store)) .apply(Count.globally()); redis.stop(); From 8953a7a40e3892589ec47dd5b1332f72f659710a Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Wed, 26 Feb 2020 08:47:27 +0800 Subject: [PATCH 4/8] Check the connection before connecting as lettuce does the retry automatically --- .../store/serving/redis/RedisCustomIO.java | 32 +++++++++---------- .../redis/RedisStandaloneIngestionClient.java | 7 ++-- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java index c9f34032ec..31b9c0553f 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java @@ -189,11 +189,11 @@ public static class WriteDoFn extends DoFn { private final List mutations = new ArrayList<>(); private int batchSize = DEFAULT_BATCH_SIZE; private int timeout = DEFAULT_TIMEOUT; - private RedisIngestionClient ingestionClient; + private RedisIngestionClient redisIngestionClient; WriteDoFn(StoreProto.Store store) { if (store.getType() == StoreProto.Store.StoreType.REDIS) - this.ingestionClient = new RedisStandaloneIngestionClient(store.getRedisConfig()); + this.redisIngestionClient = new RedisStandaloneIngestionClient(store.getRedisConfig()); } public WriteDoFn withBatchSize(int batchSize) { @@ -212,13 +212,13 @@ public WriteDoFn withTimeout(int timeout) { @Setup public void setup() { - this.ingestionClient.setup(); + this.redisIngestionClient.setup(); } @StartBundle public void startBundle() { try { - ingestionClient.connect(); + redisIngestionClient.connect(); } catch (RedisConnectionException e) { log.error("Connection to redis cannot be established ", e); } @@ -226,24 +226,24 @@ public void startBundle() { } private void executeBatch() throws Exception { - this.ingestionClient + this.redisIngestionClient .getBackOffExecutor() .execute( new Retriable() { @Override public void execute() throws ExecutionException, InterruptedException { - if (!ingestionClient.isConnected()) { - ingestionClient.connect(); + if (!redisIngestionClient.isConnected()) { + redisIngestionClient.connect(); } mutations.forEach( mutation -> { writeRecord(mutation); if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) { - ingestionClient.pexpire(mutation.getKey(), mutation.getExpiryMillis()); + redisIngestionClient.pexpire(mutation.getKey(), mutation.getExpiryMillis()); } }); - ingestionClient.sync(); + redisIngestionClient.sync(); mutations.clear(); } @@ -290,22 +290,22 @@ public void processElement(ProcessContext context) { private void writeRecord(RedisMutation mutation) { switch (mutation.getMethod()) { case APPEND: - ingestionClient.append(mutation.getKey(), mutation.getValue()); + redisIngestionClient.append(mutation.getKey(), mutation.getValue()); return; case SET: - ingestionClient.set(mutation.getKey(), mutation.getValue()); + redisIngestionClient.set(mutation.getKey(), mutation.getValue()); return; case LPUSH: - ingestionClient.lpush(mutation.getKey(), mutation.getValue()); + redisIngestionClient.lpush(mutation.getKey(), mutation.getValue()); return; case RPUSH: - ingestionClient.rpush(mutation.getKey(), mutation.getValue()); + redisIngestionClient.rpush(mutation.getKey(), mutation.getValue()); return; case SADD: - ingestionClient.sadd(mutation.getKey(), mutation.getValue()); + redisIngestionClient.sadd(mutation.getKey(), mutation.getValue()); return; case ZADD: - ingestionClient.zadd(mutation.getKey(), mutation.getScore(), mutation.getValue()); + redisIngestionClient.zadd(mutation.getKey(), mutation.getScore(), mutation.getValue()); return; default: throw new UnsupportedOperationException( @@ -333,7 +333,7 @@ public void finishBundle(FinishBundleContext context) @Teardown public void teardown() { - ingestionClient.shutdown(); + redisIngestionClient.shutdown(); } } } diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java b/ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java index 7953f8c101..de1f74151a 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java @@ -63,8 +63,10 @@ public void shutdown() { @Override public void connect() { - this.connection = this.redisclient.connect(new ByteArrayCodec()); - this.commands = connection.async(); + if (!isConnected()) { + this.connection = this.redisclient.connect(new ByteArrayCodec()); + this.commands = connection.async(); + } } @Override @@ -75,6 +77,7 @@ public boolean isConnected() { @Override public void sync() { // Wait for some time for futures to complete + // TODO: should this be configurable? LettuceFutures.awaitAll(60, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0])); futures.clear(); } From 37d719826f654c396956076179ae1656d8d15918 Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Wed, 26 Feb 2020 08:52:32 +0800 Subject: [PATCH 5/8] Running spotless --- .../src/main/java/feast/store/serving/redis/RedisCustomIO.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java index 31b9c0553f..633c2eb551 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java @@ -240,7 +240,8 @@ public void execute() throws ExecutionException, InterruptedException { writeRecord(mutation); if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) { - redisIngestionClient.pexpire(mutation.getKey(), mutation.getExpiryMillis()); + redisIngestionClient.pexpire( + mutation.getKey(), mutation.getExpiryMillis()); } }); redisIngestionClient.sync(); From 63f06815415db120c5a782570bed2f65f4fb5db4 Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Thu, 27 Feb 2020 12:12:00 +0800 Subject: [PATCH 6/8] Throw Exception if the job store config is null --- .../redis/JobStoreRedisConfig.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java b/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java index e0bce095b3..feb4a56dab 100644 --- a/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java +++ b/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java @@ -40,17 +40,19 @@ ClientResources jobStoreClientResources() { @Bean(destroyMethod = "shutdown") RedisClient jobStoreRedisClient( ClientResources jobStoreClientResources, FeastProperties feastProperties) { - try { - if (StoreProto.Store.StoreType.valueOf(feastProperties.getJobs().getStoreType()) - != StoreProto.Store.StoreType.REDIS) return null; - Map jobStoreConf = feastProperties.getJobs().getStoreOptions(); - RedisURI uri = - RedisURI.create(jobStoreConf.get("host"), Integer.parseInt(jobStoreConf.get("port"))); - return RedisClient.create(jobStoreClientResources, uri); - } catch (Exception e) { - // If the store type is empty or keys are not not properly set. - return null; - } + if (StoreProto.Store.StoreType.valueOf(feastProperties.getJobs().getStoreType()) + != StoreProto.Store.StoreType.REDIS) return null; + Map jobStoreConf = feastProperties.getJobs().getStoreOptions(); + // If job conf is empty throw StoreException + if (jobStoreConf == null + || jobStoreConf.get("host") == null + || jobStoreConf.get("host").isEmpty() + || jobStoreConf.get("port") == null + || jobStoreConf.get("port").isEmpty()) + throw new IllegalArgumentException("Store Configuration is not set"); + RedisURI uri = + RedisURI.create(jobStoreConf.get("host"), Integer.parseInt(jobStoreConf.get("port"))); + return RedisClient.create(jobStoreClientResources, uri); } @Bean(destroyMethod = "close") From c977b791cacae337585f3e1fbd2ecb4a3d768e97 Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Thu, 27 Feb 2020 12:26:39 +0800 Subject: [PATCH 7/8] Handle No enum constant RuntimeException --- .../serving/configuration/redis/JobStoreRedisConfig.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java b/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java index feb4a56dab..77d9262bcb 100644 --- a/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java +++ b/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java @@ -16,6 +16,7 @@ */ package feast.serving.configuration.redis; +import com.google.common.base.Enums; import feast.core.StoreProto; import feast.serving.FeastProperties; import io.lettuce.core.RedisClient; @@ -40,8 +41,11 @@ ClientResources jobStoreClientResources() { @Bean(destroyMethod = "shutdown") RedisClient jobStoreRedisClient( ClientResources jobStoreClientResources, FeastProperties feastProperties) { - if (StoreProto.Store.StoreType.valueOf(feastProperties.getJobs().getStoreType()) - != StoreProto.Store.StoreType.REDIS) return null; + StoreProto.Store.StoreType storeType = + Enums.getIfPresent( + StoreProto.Store.StoreType.class, feastProperties.getJobs().getStoreType()) + .orNull(); + if (storeType != StoreProto.Store.StoreType.REDIS) return null; Map jobStoreConf = feastProperties.getJobs().getStoreOptions(); // If job conf is empty throw StoreException if (jobStoreConf == null From e2349cad15eba49d7d0b49902195b4aede0eebae Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Thu, 27 Feb 2020 20:43:45 +0800 Subject: [PATCH 8/8] Future should be cleared everytime sync is called --- .../serving/redis/RedisStandaloneIngestionClient.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java b/ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java index de1f74151a..d95ebbbf64 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java @@ -78,8 +78,11 @@ public boolean isConnected() { public void sync() { // Wait for some time for futures to complete // TODO: should this be configurable? - LettuceFutures.awaitAll(60, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0])); - futures.clear(); + try { + LettuceFutures.awaitAll(60, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0])); + } finally { + futures.clear(); + } } @Override