From 8658ccb08a7b46f83e08adc685b38c5049954a18 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Tue, 21 Jan 2020 11:09:30 +0800 Subject: [PATCH 1/6] Change job service connection to use a connection pool --- .../configuration/JobServiceConfig.java | 39 +++++++++++++------ .../configuration/ServingServiceConfig.java | 15 ------- .../service/BigQueryServingService.java | 11 +++--- .../service/RedisBackedJobService.java | 9 +++-- .../bigquery/BatchRetrievalQueryRunnable.java | 12 +++--- serving/src/main/resources/application.yml | 4 ++ 6 files changed, 47 insertions(+), 43 deletions(-) diff --git a/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java b/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java index 2afbdaf90d..4c6b652c46 100644 --- a/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java @@ -16,40 +16,55 @@ */ package feast.serving.configuration; -import feast.core.StoreProto.Store; -import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; +import feast.serving.FeastProperties; import feast.serving.service.JobService; import feast.serving.service.NoopJobService; import feast.serving.service.RedisBackedJobService; import feast.serving.specs.CachedSpecService; +import java.util.Map; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; @Configuration public class JobServiceConfig { + public static final String DEFAULT_REDIS_MAX_CONN = "8"; + public static final String DEFAULT_REDIS_MAX_IDLE = "8"; + public static final String DEFAULT_REDIS_MAX_WAIT_MILLIS = "50"; + @Bean - public JobService jobService(Store jobStore, CachedSpecService specService) { + public JobService jobService(FeastProperties feastProperties, CachedSpecService specService) { if (!specService.getStore().getType().equals(StoreType.BIGQUERY)) { return new NoopJobService(); } - - switch (jobStore.getType()) { + StoreType storeType = StoreType.valueOf(feastProperties.getJobs().getStoreType()); + Map storeOptions = feastProperties.getJobs().getStoreOptions(); + switch (storeType) { case REDIS: - RedisConfig redisConfig = jobStore.getRedisConfig(); - Jedis jedis = new Jedis(redisConfig.getHost(), redisConfig.getPort()); - return new RedisBackedJobService(jedis); + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + jedisPoolConfig.setMaxTotal( + Integer.parseInt(storeOptions.getOrDefault("max-conn", DEFAULT_REDIS_MAX_CONN))); + jedisPoolConfig.setMaxIdle( + Integer.parseInt(storeOptions.getOrDefault("max-idle", DEFAULT_REDIS_MAX_IDLE))); + jedisPoolConfig.setMaxWaitMillis( + Integer.parseInt( + storeOptions.getOrDefault("max-wait-millis", DEFAULT_REDIS_MAX_WAIT_MILLIS))); + JedisPool jedisPool = + new JedisPool( + jedisPoolConfig, + storeOptions.get("host"), + Integer.parseInt(storeOptions.get("port"))); + return new RedisBackedJobService(jedisPool); case INVALID: case BIGQUERY: case CASSANDRA: case UNRECOGNIZED: default: throw new IllegalArgumentException( - String.format( - "Unsupported store type '%s' for job store name '%s'", - jobStore.getType(), jobStore.getName())); + String.format("Unsupported store type '%s' for job store", storeType)); } } } diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java index ea6dbc6ef7..3cc115978a 100644 --- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java @@ -22,12 +22,9 @@ import com.google.cloud.storage.StorageOptions; import feast.core.StoreProto.Store; import feast.core.StoreProto.Store.BigQueryConfig; -import feast.core.StoreProto.Store.Builder; import feast.core.StoreProto.Store.RedisConfig; -import feast.core.StoreProto.Store.StoreType; import feast.core.StoreProto.Store.Subscription; import feast.serving.FeastProperties; -import feast.serving.FeastProperties.JobProperties; import feast.serving.service.BigQueryServingService; import feast.serving.service.JobService; import feast.serving.service.NoopJobService; @@ -47,18 +44,6 @@ public class ServingServiceConfig { private static final Logger log = org.slf4j.LoggerFactory.getLogger(ServingServiceConfig.class); - @Bean(name = "JobStore") - public Store jobStoreDefinition(FeastProperties feastProperties) { - JobProperties jobProperties = feastProperties.getJobs(); - if (feastProperties.getJobs().getStoreType().equals("")) { - return Store.newBuilder().build(); - } - Map options = jobProperties.getStoreOptions(); - Builder storeDefinitionBuilder = - Store.newBuilder().setType(StoreType.valueOf(jobProperties.getStoreType())); - return setStoreConfig(storeDefinitionBuilder, options); - } - private Store setStoreConfig(Store.Builder builder, Map options) { switch (builder.getType()) { case REDIS: diff --git a/serving/src/main/java/feast/serving/service/BigQueryServingService.java b/serving/src/main/java/feast/serving/service/BigQueryServingService.java index 0743245d16..f23cbbe64a 100644 --- a/serving/src/main/java/feast/serving/service/BigQueryServingService.java +++ b/serving/src/main/java/feast/serving/service/BigQueryServingService.java @@ -267,13 +267,12 @@ private TableId generateUUIDs(Table loadedEntityTable) { } private Job waitForJob(Job queryJob) throws InterruptedException { - Job completedJob = queryJob.waitFor( - RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)), - RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs))); + Job completedJob = + queryJob.waitFor( + RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)), + RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs))); if (completedJob == null) { - throw Status.INTERNAL - .withDescription("Job no longer exists") - .asRuntimeException(); + throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException(); } else if (completedJob.getStatus().getError() != null) { throw Status.INTERNAL .withDescription("Job failed: " + completedJob.getStatus().getError()) diff --git a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java index 7bfce55225..89b0953301 100644 --- a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java +++ b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java @@ -23,6 +23,7 @@ import org.joda.time.Duration; import org.slf4j.Logger; import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; // TODO: Do rate limiting, currently if clients call get() or upsert() // and an exceedingly high rate e.g. they wrap job reload in a while loop with almost no wait @@ -31,17 +32,18 @@ public class RedisBackedJobService implements JobService { private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedisBackedJobService.class); - private final Jedis jedis; + private final JedisPool jedisPool; // Remove job state info after "defaultExpirySeconds" to prevent filling up Redis memory // and since users normally don't require info about relatively old jobs. private final int defaultExpirySeconds = (int) Duration.standardDays(1).getStandardSeconds(); - public RedisBackedJobService(Jedis jedis) { - this.jedis = jedis; + public RedisBackedJobService(JedisPool jedisPool) { + this.jedisPool = jedisPool; } @Override public Optional get(String id) { + Jedis jedis = jedisPool.getResource(); String json = jedis.get(id); if (json == null) { return Optional.empty(); @@ -60,6 +62,7 @@ public Optional get(String id) { @Override public void upsert(Job job) { + Jedis jedis = jedisPool.getResource(); try { jedis.set(job.getId(), JsonFormat.printer().omittingInsignificantWhitespace().print(job)); jedis.expire(job.getId(), defaultExpirySeconds); diff --git a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java index a96c2bc8d7..61103af109 100644 --- a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java +++ b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java @@ -336,13 +336,12 @@ private FieldValueList getTimestampLimits(String entityTableName) { } private Job waitForJob(Job queryJob) throws InterruptedException { - Job completedJob = queryJob.waitFor( - RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())), - RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs()))); + Job completedJob = + queryJob.waitFor( + RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())), + RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs()))); if (completedJob == null) { - throw Status.INTERNAL - .withDescription("Job no longer exists") - .asRuntimeException(); + throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException(); } else if (completedJob.getStatus().getError() != null) { throw Status.INTERNAL .withDescription("Job failed: " + completedJob.getStatus().getError()) @@ -350,5 +349,4 @@ private Job waitForJob(Job queryJob) throws InterruptedException { } return completedJob; } - } diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index 072787492f..96713c8028 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -47,6 +47,10 @@ feast: # store-options: # host: localhost # port: 6379 + # Optionally, you can configure the connection pool with the following items: + # max-conn: 8 + # max-idle: 8 + # max-wait-millis: 50 store-options: {} grpc: From 1dbcdfd68c9ef490c70e6b0731d2c6c7ea33a3e6 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Tue, 21 Jan 2020 12:10:11 +0800 Subject: [PATCH 2/6] Close connection --- .../feast/serving/service/RedisBackedJobService.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java index 89b0953301..a1f8e0ccf0 100644 --- a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java +++ b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java @@ -55,8 +55,11 @@ public Optional get(String id) { job = builder.build(); } catch (Exception e) { log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage())); + } finally { + if (jedis.isConnected()) { + jedis.close(); + } } - return Optional.ofNullable(job); } @@ -68,6 +71,10 @@ public void upsert(Job job) { jedis.expire(job.getId(), defaultExpirySeconds); } catch (Exception e) { log.error(String.format("Failed to upsert job: %s", e.getMessage())); + } finally { + if (jedis.isConnected()) { + jedis.close(); + } } } } From d2e81123f3da8b143c98e62a83ebf7728983e358 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Tue, 21 Jan 2020 13:41:45 +0800 Subject: [PATCH 3/6] Remove isConnected condition --- .../java/feast/serving/service/RedisBackedJobService.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java index a1f8e0ccf0..78f134eedd 100644 --- a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java +++ b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java @@ -56,9 +56,7 @@ public Optional get(String id) { } catch (Exception e) { log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage())); } finally { - if (jedis.isConnected()) { - jedis.close(); - } + jedis.close(); } return Optional.ofNullable(job); } @@ -72,9 +70,7 @@ public void upsert(Job job) { } catch (Exception e) { log.error(String.format("Failed to upsert job: %s", e.getMessage())); } finally { - if (jedis.isConnected()) { - jedis.close(); - } + jedis.close(); } } } From 98bf375f7c2f8b9123eb8d2f4bc9ae84ef5f1bf0 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Tue, 21 Jan 2020 14:04:22 +0800 Subject: [PATCH 4/6] Add simple test to test that pool recovers broken connections --- serving/pom.xml | 6 +++ .../service/RedisBackedJobService.java | 24 ++++++---- .../service/RedisBackedJobServiceTest.java | 44 +++++++++++++++++++ 3 files changed, 65 insertions(+), 9 deletions(-) create mode 100644 serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java diff --git a/serving/pom.xml b/serving/pom.xml index c15881030e..be573be45c 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -250,6 +250,12 @@ com.fasterxml.jackson.dataformat jackson-dataformat-yaml + + + com.github.kstyrc + embedded-redis + test + diff --git a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java index 78f134eedd..9beab93c94 100644 --- a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java +++ b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java @@ -43,34 +43,40 @@ public RedisBackedJobService(JedisPool jedisPool) { @Override public Optional get(String id) { - Jedis jedis = jedisPool.getResource(); - String json = jedis.get(id); - if (json == null) { - return Optional.empty(); - } + Jedis jedis = null; Job job = null; - Builder builder = Job.newBuilder(); try { + jedis = jedisPool.getResource(); + String json = jedis.get(id); + if (json == null) { + return Optional.empty(); + } + Builder builder = Job.newBuilder(); JsonFormat.parser().merge(json, builder); job = builder.build(); } catch (Exception e) { log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage())); } finally { - jedis.close(); + if (jedis != null) { + jedis.close(); + } } return Optional.ofNullable(job); } @Override public void upsert(Job job) { - Jedis jedis = jedisPool.getResource(); + Jedis jedis = null; try { + jedis = jedisPool.getResource(); jedis.set(job.getId(), JsonFormat.printer().omittingInsignificantWhitespace().print(job)); jedis.expire(job.getId(), defaultExpirySeconds); } catch (Exception e) { log.error(String.format("Failed to upsert job: %s", e.getMessage())); } finally { - jedis.close(); + if (jedis != null) { + jedis.close(); + } } } } diff --git a/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java new file mode 100644 index 0000000000..f4047f5112 --- /dev/null +++ b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java @@ -0,0 +1,44 @@ +package feast.serving.service; + +import java.io.IOException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; +import redis.embedded.RedisServer; + +public class RedisBackedJobServiceTest { + private static String REDIS_HOST = "localhost"; + private static int REDIS_PORT = 51235; + private RedisServer redis; + + @Before + public void setUp() throws IOException { + redis = new RedisServer(REDIS_PORT); + redis.start(); + } + + @After + public void teardown() { + redis.stop(); + } + + @Test + public void shouldRecoverIfRedisConnectionIsLost() { + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + jedisPoolConfig.setMaxTotal(1); + jedisPoolConfig.setMaxWaitMillis(10); + JedisPool jedisPool = new JedisPool(jedisPoolConfig, REDIS_HOST, REDIS_PORT); + RedisBackedJobService jobService = new RedisBackedJobService(jedisPool); + jobService.get("does not exist"); + redis.stop(); + try { + jobService.get("does not exist"); + } catch (Exception e) { + // pass, this should fail, and return a broken connection to the pool + } + redis.start(); + jobService.get("does not exist"); + } +} \ No newline at end of file From 3a8f80e233904792577b5e91967604128768a6e4 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Tue, 21 Jan 2020 14:30:01 +0800 Subject: [PATCH 5/6] Catch JedisExceptions separately --- .../serving/service/RedisBackedJobService.java | 3 +++ .../service/RedisBackedJobServiceTest.java | 18 +++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java index 9beab93c94..8e16ba8452 100644 --- a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java +++ b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; +import redis.clients.jedis.exceptions.JedisException; // TODO: Do rate limiting, currently if clients call get() or upsert() // and an exceedingly high rate e.g. they wrap job reload in a while loop with almost no wait @@ -54,6 +55,8 @@ public Optional get(String id) { Builder builder = Job.newBuilder(); JsonFormat.parser().merge(json, builder); job = builder.build(); + } catch (JedisException e) { + log.error(String.format("Failed to connect to the redis instance: %s", e)); } catch (Exception e) { log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage())); } finally { diff --git a/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java index f4047f5112..9247375f59 100644 --- a/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java +++ b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java @@ -1,3 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package feast.serving.service; import java.io.IOException; @@ -41,4 +57,4 @@ public void shouldRecoverIfRedisConnectionIsLost() { redis.start(); jobService.get("does not exist"); } -} \ No newline at end of file +} From b3a2cf5ca05740f43ed921bf5d81023dc758aee0 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Tue, 21 Jan 2020 14:40:27 +0800 Subject: [PATCH 6/6] Catch only JedisConnectionException --- .../java/feast/serving/service/RedisBackedJobService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java index 8e16ba8452..230e20cd78 100644 --- a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java +++ b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java @@ -24,7 +24,7 @@ import org.slf4j.Logger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; -import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.exceptions.JedisConnectionException; // TODO: Do rate limiting, currently if clients call get() or upsert() // and an exceedingly high rate e.g. they wrap job reload in a while loop with almost no wait @@ -55,7 +55,7 @@ public Optional get(String id) { Builder builder = Job.newBuilder(); JsonFormat.parser().merge(json, builder); job = builder.build(); - } catch (JedisException e) { + } catch (JedisConnectionException e) { log.error(String.format("Failed to connect to the redis instance: %s", e)); } catch (Exception e) { log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage()));