Skip to content

Commit

Permalink
Merge pull request feast-dev#72 from farfetch-external/isacabe/KE-998…
Browse files Browse the repository at this point in the history
…_redis_empty_pass

(ke- 998) Redis connector allows  "empty password"
  • Loading branch information
isabelcabezasm authored Jul 17, 2020
2 parents 8cc92a8 + f775643 commit 8ac0733
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 13 deletions.
3 changes: 2 additions & 1 deletion infra/docker-compose/serving/databricks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ feast:
- name: "*"
project: "*"
job_store:
redis_host: redis
redis_host: ${REDIS_HOST}
redis_pass: '${REDIS_PASS}'
redis_port: 6379

grpc:
Expand Down
12 changes: 10 additions & 2 deletions ingestion/src/test/java/feast/ingestion/ImportJobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.joda.time.Duration;
import org.junit.AfterClass;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class ImportJobTest {

private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6380;
// test without password
private static final String REDIS_PASS = "";

// No of samples of feature row that will be generated and used for testing.
// Note that larger no of samples will increase completion time for ingestion.
Expand Down Expand Up @@ -153,12 +156,17 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow()

FeatureSet featureSet = FeatureSet.newBuilder().setSpec(spec).build();

RedisConfig.Builder redisconfiguration =
RedisConfig.newBuilder().setHost(REDIS_HOST).setPort(REDIS_PORT);
if (StringUtils.trimToNull(REDIS_PASS) != null) {
redisconfiguration.setPass(REDIS_PASS);
}

Store redis =
Store.newBuilder()
.setName(StoreType.REDIS.toString())
.setType(StoreType.REDIS)
.setRedisConfig(
RedisConfig.newBuilder().setHost(REDIS_HOST).setPort(REDIS_PORT).build())
.setRedisConfig(redisconfiguration.build())
.addSubscriptions(
Subscription.newBuilder()
.setProject(spec.getProject())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.resource.DefaultClientResources;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;

Expand All @@ -45,7 +46,11 @@ public class RedisBackedJobService implements JobService {
public RedisBackedJobService(FeastProperties.JobStoreProperties jobStoreProperties) {
RedisURI uri =
RedisURI.create(jobStoreProperties.getRedisHost(), jobStoreProperties.getRedisPort());
uri.setPassword(jobStoreProperties.getRedisPass());
String password = jobStoreProperties.getRedisPass();
if (StringUtils.trimToNull(password) != null) {
uri.setPassword(password);
}

this.syncCommand =
RedisClient.create(DefaultClientResources.create(), uri)
.connect(new ByteArrayCodec())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.broadcast.Broadcast;
Expand Down Expand Up @@ -76,7 +77,11 @@ public VoidFunction2<Dataset<byte[]>, Long> configure() {
redisConfig.getHost(),
redisConfig.getPort(),
java.time.Duration.ofMillis(DEFAULT_TIMEOUT));
redisuri.setPassword(redisConfig.getPass());

String password = redisConfig.getPass();
if (StringUtils.trimToNull(password) != null) {
redisuri.setPassword(password);
}

return new RedisWriter(jobId, broadcastedWriter, redisuri);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -589,7 +590,11 @@ public static void validateRedis(
RedisURI redisuri =
new RedisURI(
redisConfig.getHost(), redisConfig.getPort(), java.time.Duration.ofMillis(2000));
redisuri.setPassword(redisConfig.getPass());

String password = redisConfig.getPass();
if (StringUtils.trimToNull(password) != null) {
redisuri.setPassword(password);
}

RedisClient redisClient = RedisClient.create(redisuri);
StatefulRedisConnection<byte[], byte[]> connection = redisClient.connect(new ByteArrayCodec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

public class RedisClusterOnlineRetriever implements OnlineRetriever {

Expand All @@ -58,7 +59,10 @@ public static OnlineRetriever create(Map<String, String> config) {
RedisURI redisuri =
RedisURI.create(hostPortSplit[0], Integer.parseInt(hostPortSplit[1]));
if (hostPortSplit.length == 3) {
redisuri.setPassword(hostPortSplit[2]);
String password = hostPortSplit[2];
if (StringUtils.trimToNull(password) != null) {
redisuri.setPassword(password);
}
}
return redisuri;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

public class RedisOnlineRetriever implements OnlineRetriever {

Expand All @@ -50,7 +51,10 @@ private RedisOnlineRetriever(StatefulRedisConnection<byte[], byte[]> connection)

public static OnlineRetriever create(Map<String, String> config) {
RedisURI redisuri = RedisURI.create(config.get("host"), Integer.parseInt(config.get("port")));
redisuri.setPassword(config.get("pass"));
String password = config.get("pass");
if (StringUtils.trimToNull(password) != null) {
redisuri.setPassword(password);
}

StatefulRedisConnection<byte[], byte[]> connection =
RedisClient.create(redisuri).connect(new ByteArrayCodec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Duration;

public class RedisClusterIngestionClient implements RedisIngestionClient {
Expand All @@ -50,7 +51,10 @@ public RedisClusterIngestionClient(StoreProto.Store.RedisClusterConfig redisClus
RedisURI redisuri =
RedisURI.create(hostPortSplit[0], Integer.parseInt(hostPortSplit[1]));
if (hostPortSplit.length == 3) {
redisuri.setPassword(hostPortSplit[2]);
String password = hostPortSplit[2];
if (StringUtils.trimToNull(password) != null) {
redisuri.setPassword(password);
}
}
return redisuri;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;

@AutoValue
public abstract class RedisFeatureSink implements FeatureSink {
Expand Down Expand Up @@ -82,7 +83,11 @@ public abstract static class Builder {
public void prepareWrite(FeatureSet featureSet) {
if (getRedisConfig() != null) {
RedisURI redisuri = RedisURI.create(getRedisConfig().getHost(), getRedisConfig().getPort());
redisuri.setPassword(getRedisConfig().getPass());

String password = getRedisConfig().getPass();
if (StringUtils.trimToNull(password) != null) {
redisuri.setPassword(password);
}
RedisClient redisClient = RedisClient.create(redisuri);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
import io.lettuce.core.codec.ByteArrayCodec;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Duration;

public class RedisStandaloneIngestionClient implements RedisIngestionClient {
private final String host;
private final String pass;
private final String password;
private final int port;
private final BackOffExecutor backOffExecutor;
private RedisClient redisclient;
Expand All @@ -44,7 +45,7 @@ public class RedisStandaloneIngestionClient implements RedisIngestionClient {
public RedisStandaloneIngestionClient(StoreProto.Store.RedisConfig redisConfig) {
this.host = redisConfig.getHost();
this.port = redisConfig.getPort();
this.pass = redisConfig.getPass();
this.password = redisConfig.getPass();
long backoffMs = redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
this.backOffExecutor =
new BackOffExecutor(redisConfig.getMaxRetries(), Duration.millis(backoffMs));
Expand All @@ -53,7 +54,9 @@ public RedisStandaloneIngestionClient(StoreProto.Store.RedisConfig redisConfig)
@Override
public void setup() {
RedisURI redisuri = new RedisURI(host, port, java.time.Duration.ofMillis(DEFAULT_TIMEOUT));
redisuri.setPassword(pass);
if (StringUtils.trimToNull(password) != null) {
redisuri.setPassword(password);
}
this.redisclient = RedisClient.create(redisuri);
}

Expand Down

0 comments on commit 8ac0733

Please sign in to comment.