Skip to content

Commit

Permalink
Change RedisBackedJobService to use a connection pool (#439)
Browse files Browse the repository at this point in the history
* Change job service connection to use a connection pool

* Close connection

* Remove isConnected condition

* Add simple test to test that pool recovers broken connections

* Catch JedisExceptions separately

* Catch only JedisConnectionException
  • Loading branch information
Chen Zhiling authored and feast-ci-bot committed Jan 21, 2020
1 parent fe520a9 commit 0a8987e
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 49 deletions.
6 changes: 6 additions & 0 deletions serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>

<dependency>
<groupId>com.github.kstyrc</groupId>
<artifactId>embedded-redis</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,55 @@
*/
package feast.serving.configuration;

import feast.core.StoreProto.Store;
import feast.core.StoreProto.Store.RedisConfig;
import feast.core.StoreProto.Store.StoreType;
import feast.serving.FeastProperties;
import feast.serving.service.JobService;
import feast.serving.service.NoopJobService;
import feast.serving.service.RedisBackedJobService;
import feast.serving.specs.CachedSpecService;
import java.util.Map;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

@Configuration
public class JobServiceConfig {

public static final String DEFAULT_REDIS_MAX_CONN = "8";
public static final String DEFAULT_REDIS_MAX_IDLE = "8";
public static final String DEFAULT_REDIS_MAX_WAIT_MILLIS = "50";

@Bean
public JobService jobService(Store jobStore, CachedSpecService specService) {
public JobService jobService(FeastProperties feastProperties, CachedSpecService specService) {
if (!specService.getStore().getType().equals(StoreType.BIGQUERY)) {
return new NoopJobService();
}

switch (jobStore.getType()) {
StoreType storeType = StoreType.valueOf(feastProperties.getJobs().getStoreType());
Map<String, String> storeOptions = feastProperties.getJobs().getStoreOptions();
switch (storeType) {
case REDIS:
RedisConfig redisConfig = jobStore.getRedisConfig();
Jedis jedis = new Jedis(redisConfig.getHost(), redisConfig.getPort());
return new RedisBackedJobService(jedis);
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(
Integer.parseInt(storeOptions.getOrDefault("max-conn", DEFAULT_REDIS_MAX_CONN)));
jedisPoolConfig.setMaxIdle(
Integer.parseInt(storeOptions.getOrDefault("max-idle", DEFAULT_REDIS_MAX_IDLE)));
jedisPoolConfig.setMaxWaitMillis(
Integer.parseInt(
storeOptions.getOrDefault("max-wait-millis", DEFAULT_REDIS_MAX_WAIT_MILLIS)));
JedisPool jedisPool =
new JedisPool(
jedisPoolConfig,
storeOptions.get("host"),
Integer.parseInt(storeOptions.get("port")));
return new RedisBackedJobService(jedisPool);
case INVALID:
case BIGQUERY:
case CASSANDRA:
case UNRECOGNIZED:
default:
throw new IllegalArgumentException(
String.format(
"Unsupported store type '%s' for job store name '%s'",
jobStore.getType(), jobStore.getName()));
String.format("Unsupported store type '%s' for job store", storeType));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@
import com.google.cloud.storage.StorageOptions;
import feast.core.StoreProto.Store;
import feast.core.StoreProto.Store.BigQueryConfig;
import feast.core.StoreProto.Store.Builder;
import feast.core.StoreProto.Store.RedisConfig;
import feast.core.StoreProto.Store.StoreType;
import feast.core.StoreProto.Store.Subscription;
import feast.serving.FeastProperties;
import feast.serving.FeastProperties.JobProperties;
import feast.serving.service.BigQueryServingService;
import feast.serving.service.JobService;
import feast.serving.service.NoopJobService;
Expand All @@ -47,18 +44,6 @@ public class ServingServiceConfig {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(ServingServiceConfig.class);

@Bean(name = "JobStore")
public Store jobStoreDefinition(FeastProperties feastProperties) {
JobProperties jobProperties = feastProperties.getJobs();
if (feastProperties.getJobs().getStoreType().equals("")) {
return Store.newBuilder().build();
}
Map<String, String> options = jobProperties.getStoreOptions();
Builder storeDefinitionBuilder =
Store.newBuilder().setType(StoreType.valueOf(jobProperties.getStoreType()));
return setStoreConfig(storeDefinitionBuilder, options);
}

private Store setStoreConfig(Store.Builder builder, Map<String, String> options) {
switch (builder.getType()) {
case REDIS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,12 @@ private TableId generateUUIDs(Table loadedEntityTable) {
}

private Job waitForJob(Job queryJob) throws InterruptedException {
Job completedJob = queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs)));
Job completedJob =
queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs)));
if (completedJob == null) {
throw Status.INTERNAL
.withDescription("Job no longer exists")
.asRuntimeException();
throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException();
} else if (completedJob.getStatus().getError() != null) {
throw Status.INTERNAL
.withDescription("Job failed: " + completedJob.getStatus().getError())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.joda.time.Duration;
import org.slf4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException;

// TODO: Do rate limiting, currently if clients call get() or upsert()
// and an exceedingly high rate e.g. they wrap job reload in a while loop with almost no wait
Expand All @@ -31,40 +33,53 @@
public class RedisBackedJobService implements JobService {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedisBackedJobService.class);
private final Jedis jedis;
private final JedisPool jedisPool;
// Remove job state info after "defaultExpirySeconds" to prevent filling up Redis memory
// and since users normally don't require info about relatively old jobs.
private final int defaultExpirySeconds = (int) Duration.standardDays(1).getStandardSeconds();

public RedisBackedJobService(Jedis jedis) {
this.jedis = jedis;
public RedisBackedJobService(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}

@Override
public Optional<Job> get(String id) {
String json = jedis.get(id);
if (json == null) {
return Optional.empty();
}
Jedis jedis = null;
Job job = null;
Builder builder = Job.newBuilder();
try {
jedis = jedisPool.getResource();
String json = jedis.get(id);
if (json == null) {
return Optional.empty();
}
Builder builder = Job.newBuilder();
JsonFormat.parser().merge(json, builder);
job = builder.build();
} catch (JedisConnectionException e) {
log.error(String.format("Failed to connect to the redis instance: %s", e));
} catch (Exception e) {
log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage()));
} finally {
if (jedis != null) {
jedis.close();
}
}

return Optional.ofNullable(job);
}

@Override
public void upsert(Job job) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(job.getId(), JsonFormat.printer().omittingInsignificantWhitespace().print(job));
jedis.expire(job.getId(), defaultExpirySeconds);
} catch (Exception e) {
log.error(String.format("Failed to upsert job: %s", e.getMessage()));
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,19 +336,17 @@ private FieldValueList getTimestampLimits(String entityTableName) {
}

private Job waitForJob(Job queryJob) throws InterruptedException {
Job completedJob = queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs())));
Job completedJob =
queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs())));
if (completedJob == null) {
throw Status.INTERNAL
.withDescription("Job no longer exists")
.asRuntimeException();
throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException();
} else if (completedJob.getStatus().getError() != null) {
throw Status.INTERNAL
.withDescription("Job failed: " + completedJob.getStatus().getError())
.asRuntimeException();
}
return completedJob;
}

}
4 changes: 4 additions & 0 deletions serving/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ feast:
# store-options:
# host: localhost
# port: 6379
# Optionally, you can configure the connection pool with the following items:
# max-conn: 8
# max-idle: 8
# max-wait-millis: 50
store-options: {}

grpc:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.serving.service;

import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.embedded.RedisServer;

public class RedisBackedJobServiceTest {
private static String REDIS_HOST = "localhost";
private static int REDIS_PORT = 51235;
private RedisServer redis;

@Before
public void setUp() throws IOException {
redis = new RedisServer(REDIS_PORT);
redis.start();
}

@After
public void teardown() {
redis.stop();
}

@Test
public void shouldRecoverIfRedisConnectionIsLost() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(1);
jedisPoolConfig.setMaxWaitMillis(10);
JedisPool jedisPool = new JedisPool(jedisPoolConfig, REDIS_HOST, REDIS_PORT);
RedisBackedJobService jobService = new RedisBackedJobService(jedisPool);
jobService.get("does not exist");
redis.stop();
try {
jobService.get("does not exist");
} catch (Exception e) {
// pass, this should fail, and return a broken connection to the pool
}
redis.start();
jobService.get("does not exist");
}
}

0 comments on commit 0a8987e

Please sign in to comment.