Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change RedisBackedJobService to use a connection pool #439

Merged
merged 6 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>

<dependency>
<groupId>com.github.kstyrc</groupId>
<artifactId>embedded-redis</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,55 @@
*/
package feast.serving.configuration;

import feast.core.StoreProto.Store;
import feast.core.StoreProto.Store.RedisConfig;
import feast.core.StoreProto.Store.StoreType;
import feast.serving.FeastProperties;
import feast.serving.service.JobService;
import feast.serving.service.NoopJobService;
import feast.serving.service.RedisBackedJobService;
import feast.serving.specs.CachedSpecService;
import java.util.Map;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

@Configuration
public class JobServiceConfig {

public static final String DEFAULT_REDIS_MAX_CONN = "8";
public static final String DEFAULT_REDIS_MAX_IDLE = "8";
public static final String DEFAULT_REDIS_MAX_WAIT_MILLIS = "50";

@Bean
public JobService jobService(Store jobStore, CachedSpecService specService) {
public JobService jobService(FeastProperties feastProperties, CachedSpecService specService) {
if (!specService.getStore().getType().equals(StoreType.BIGQUERY)) {
return new NoopJobService();
}

switch (jobStore.getType()) {
StoreType storeType = StoreType.valueOf(feastProperties.getJobs().getStoreType());
Map<String, String> storeOptions = feastProperties.getJobs().getStoreOptions();
switch (storeType) {
case REDIS:
RedisConfig redisConfig = jobStore.getRedisConfig();
Jedis jedis = new Jedis(redisConfig.getHost(), redisConfig.getPort());
return new RedisBackedJobService(jedis);
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(
Integer.parseInt(storeOptions.getOrDefault("max-conn", DEFAULT_REDIS_MAX_CONN)));
jedisPoolConfig.setMaxIdle(
Integer.parseInt(storeOptions.getOrDefault("max-idle", DEFAULT_REDIS_MAX_IDLE)));
jedisPoolConfig.setMaxWaitMillis(
Integer.parseInt(
storeOptions.getOrDefault("max-wait-millis", DEFAULT_REDIS_MAX_WAIT_MILLIS)));
JedisPool jedisPool =
new JedisPool(
jedisPoolConfig,
storeOptions.get("host"),
Integer.parseInt(storeOptions.get("port")));
return new RedisBackedJobService(jedisPool);
case INVALID:
case BIGQUERY:
case CASSANDRA:
case UNRECOGNIZED:
default:
throw new IllegalArgumentException(
String.format(
"Unsupported store type '%s' for job store name '%s'",
jobStore.getType(), jobStore.getName()));
String.format("Unsupported store type '%s' for job store", storeType));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@
import com.google.cloud.storage.StorageOptions;
import feast.core.StoreProto.Store;
import feast.core.StoreProto.Store.BigQueryConfig;
import feast.core.StoreProto.Store.Builder;
import feast.core.StoreProto.Store.RedisConfig;
import feast.core.StoreProto.Store.StoreType;
import feast.core.StoreProto.Store.Subscription;
import feast.serving.FeastProperties;
import feast.serving.FeastProperties.JobProperties;
import feast.serving.service.BigQueryServingService;
import feast.serving.service.JobService;
import feast.serving.service.NoopJobService;
Expand All @@ -47,18 +44,6 @@ public class ServingServiceConfig {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(ServingServiceConfig.class);

@Bean(name = "JobStore")
public Store jobStoreDefinition(FeastProperties feastProperties) {
JobProperties jobProperties = feastProperties.getJobs();
if (feastProperties.getJobs().getStoreType().equals("")) {
return Store.newBuilder().build();
}
Map<String, String> options = jobProperties.getStoreOptions();
Builder storeDefinitionBuilder =
Store.newBuilder().setType(StoreType.valueOf(jobProperties.getStoreType()));
return setStoreConfig(storeDefinitionBuilder, options);
}

private Store setStoreConfig(Store.Builder builder, Map<String, String> options) {
switch (builder.getType()) {
case REDIS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,12 @@ private TableId generateUUIDs(Table loadedEntityTable) {
}

private Job waitForJob(Job queryJob) throws InterruptedException {
Job completedJob = queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs)));
Job completedJob =
queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs)));
if (completedJob == null) {
throw Status.INTERNAL
.withDescription("Job no longer exists")
.asRuntimeException();
throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException();
} else if (completedJob.getStatus().getError() != null) {
throw Status.INTERNAL
.withDescription("Job failed: " + completedJob.getStatus().getError())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.joda.time.Duration;
import org.slf4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException;

// TODO: Do rate limiting, currently if clients call get() or upsert()
// and an exceedingly high rate e.g. they wrap job reload in a while loop with almost no wait
Expand All @@ -31,40 +33,53 @@
public class RedisBackedJobService implements JobService {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedisBackedJobService.class);
private final Jedis jedis;
private final JedisPool jedisPool;
// Remove job state info after "defaultExpirySeconds" to prevent filling up Redis memory
// and since users normally don't require info about relatively old jobs.
private final int defaultExpirySeconds = (int) Duration.standardDays(1).getStandardSeconds();

public RedisBackedJobService(Jedis jedis) {
this.jedis = jedis;
public RedisBackedJobService(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}

@Override
public Optional<Job> get(String id) {
String json = jedis.get(id);
if (json == null) {
return Optional.empty();
}
Jedis jedis = null;
Job job = null;
Builder builder = Job.newBuilder();
try {
jedis = jedisPool.getResource();
String json = jedis.get(id);
if (json == null) {
return Optional.empty();
}
Builder builder = Job.newBuilder();
JsonFormat.parser().merge(json, builder);
job = builder.build();
} catch (JedisConnectionException e) {
log.error(String.format("Failed to connect to the redis instance: %s", e));
} catch (Exception e) {
log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage()));
zhilingc marked this conversation as resolved.
Show resolved Hide resolved
} finally {
if (jedis != null) {
jedis.close();
}
}

return Optional.ofNullable(job);
}

@Override
public void upsert(Job job) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(job.getId(), JsonFormat.printer().omittingInsignificantWhitespace().print(job));
jedis.expire(job.getId(), defaultExpirySeconds);
} catch (Exception e) {
log.error(String.format("Failed to upsert job: %s", e.getMessage()));
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,19 +336,17 @@ private FieldValueList getTimestampLimits(String entityTableName) {
}

private Job waitForJob(Job queryJob) throws InterruptedException {
Job completedJob = queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs())));
Job completedJob =
queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs())));
if (completedJob == null) {
throw Status.INTERNAL
.withDescription("Job no longer exists")
.asRuntimeException();
throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException();
} else if (completedJob.getStatus().getError() != null) {
throw Status.INTERNAL
.withDescription("Job failed: " + completedJob.getStatus().getError())
.asRuntimeException();
}
return completedJob;
}

}
4 changes: 4 additions & 0 deletions serving/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ feast:
# store-options:
# host: localhost
# port: 6379
# Optionally, you can configure the connection pool with the following items:
# max-conn: 8
# max-idle: 8
# max-wait-millis: 50
store-options: {}

grpc:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.serving.service;

import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.embedded.RedisServer;

public class RedisBackedJobServiceTest {
private static String REDIS_HOST = "localhost";
private static int REDIS_PORT = 51235;
private RedisServer redis;

@Before
public void setUp() throws IOException {
redis = new RedisServer(REDIS_PORT);
redis.start();
}

@After
public void teardown() {
redis.stop();
}

@Test
public void shouldRecoverIfRedisConnectionIsLost() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(1);
jedisPoolConfig.setMaxWaitMillis(10);
JedisPool jedisPool = new JedisPool(jedisPoolConfig, REDIS_HOST, REDIS_PORT);
RedisBackedJobService jobService = new RedisBackedJobService(jedisPool);
jobService.get("does not exist");
redis.stop();
try {
jobService.get("does not exist");
} catch (Exception e) {
// pass, this should fail, and return a broken connection to the pool
}
redis.start();
jobService.get("does not exist");
}
}