diff --git a/infra/docker-compose/serving/databricks.yml b/infra/docker-compose/serving/databricks.yml index 33a3554a57..ea39302169 100644 --- a/infra/docker-compose/serving/databricks.yml +++ b/infra/docker-compose/serving/databricks.yml @@ -20,7 +20,8 @@ feast: - name: "*" project: "*" job_store: - redis_host: redis + redis_host: ${REDIS_HOST} + redis_pass: '${REDIS_PASS}' redis_port: 6379 grpc: diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 39e4296378..dc6942316a 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.joda.time.Duration; import org.junit.AfterClass; @@ -84,6 +85,8 @@ public class ImportJobTest { private static final String REDIS_HOST = "localhost"; private static final int REDIS_PORT = 6380; + // test without password + private static final String REDIS_PASS = ""; // No of samples of feature row that will be generated and used for testing. // Note that larger no of samples will increase completion time for ingestion. @@ -153,12 +156,17 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() FeatureSet featureSet = FeatureSet.newBuilder().setSpec(spec).build(); + RedisConfig.Builder redisconfiguration = + RedisConfig.newBuilder().setHost(REDIS_HOST).setPort(REDIS_PORT); + if (StringUtils.trimToNull(REDIS_PASS) != null) { + redisconfiguration.setPass(REDIS_PASS); + } + Store redis = Store.newBuilder() .setName(StoreType.REDIS.toString()) .setType(StoreType.REDIS) - .setRedisConfig( - RedisConfig.newBuilder().setHost(REDIS_HOST).setPort(REDIS_PORT).build()) + .setRedisConfig(redisconfiguration.build()) .addSubscriptions( Subscription.newBuilder() .setProject(spec.getProject()) diff --git a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java index 28c32885ee..e1ed70d5aa 100644 --- a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java +++ b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java @@ -27,6 +27,7 @@ import io.lettuce.core.codec.ByteArrayCodec; import io.lettuce.core.resource.DefaultClientResources; import java.util.Optional; +import org.apache.commons.lang3.StringUtils; import org.joda.time.Duration; import org.slf4j.Logger; @@ -45,7 +46,11 @@ public class RedisBackedJobService implements JobService { public RedisBackedJobService(FeastProperties.JobStoreProperties jobStoreProperties) { RedisURI uri = RedisURI.create(jobStoreProperties.getRedisHost(), jobStoreProperties.getRedisPort()); - uri.setPassword(jobStoreProperties.getRedisPass()); + String password = jobStoreProperties.getRedisPass(); + if (StringUtils.trimToNull(password) != null) { + uri.setPassword(password); + } + this.syncCommand = RedisClient.create(DefaultClientResources.create(), uri) .connect(new ByteArrayCodec()) diff --git a/spark/spark-ingestion-job/src/main/java/feast/spark/ingestion/redis/SparkRedisSink.java b/spark/spark-ingestion-job/src/main/java/feast/spark/ingestion/redis/SparkRedisSink.java index 6da65ee626..6000923982 100644 --- a/spark/spark-ingestion-job/src/main/java/feast/spark/ingestion/redis/SparkRedisSink.java +++ b/spark/spark-ingestion-job/src/main/java/feast/spark/ingestion/redis/SparkRedisSink.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.broadcast.Broadcast; @@ -76,7 +77,11 @@ public VoidFunction2, Long> configure() { redisConfig.getHost(), redisConfig.getPort(), java.time.Duration.ofMillis(DEFAULT_TIMEOUT)); - redisuri.setPassword(redisConfig.getPass()); + + String password = redisConfig.getPass(); + if (StringUtils.trimToNull(password) != null) { + redisuri.setPassword(password); + } return new RedisWriter(jobId, broadcastedWriter, redisuri); } diff --git a/spark/spark-ingestion-job/src/test/java/feast/test/TestUtil.java b/spark/spark-ingestion-job/src/test/java/feast/test/TestUtil.java index c572dccbe3..cc5f490e9f 100644 --- a/spark/spark-ingestion-job/src/test/java/feast/test/TestUtil.java +++ b/spark/spark-ingestion-job/src/test/java/feast/test/TestUtil.java @@ -65,6 +65,7 @@ import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -589,7 +590,11 @@ public static void validateRedis( RedisURI redisuri = new RedisURI( redisConfig.getHost(), redisConfig.getPort(), java.time.Duration.ofMillis(2000)); - redisuri.setPassword(redisConfig.getPass()); + + String password = redisConfig.getPass(); + if (StringUtils.trimToNull(password) != null) { + redisuri.setPassword(password); + } RedisClient redisClient = RedisClient.create(redisuri); StatefulRedisConnection connection = redisClient.connect(new ByteArrayCodec()); diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java index 0be8484eb3..35ecb16ebb 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; public class RedisClusterOnlineRetriever implements OnlineRetriever { @@ -58,7 +59,10 @@ public static OnlineRetriever create(Map config) { RedisURI redisuri = RedisURI.create(hostPortSplit[0], Integer.parseInt(hostPortSplit[1])); if (hostPortSplit.length == 3) { - redisuri.setPassword(hostPortSplit[2]); + String password = hostPortSplit[2]; + if (StringUtils.trimToNull(password) != null) { + redisuri.setPassword(password); + } } return redisuri; }) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java index f4898ae747..488627d615 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; public class RedisOnlineRetriever implements OnlineRetriever { @@ -50,7 +51,10 @@ private RedisOnlineRetriever(StatefulRedisConnection connection) public static OnlineRetriever create(Map config) { RedisURI redisuri = RedisURI.create(config.get("host"), Integer.parseInt(config.get("port"))); - redisuri.setPassword(config.get("pass")); + String password = config.get("pass"); + if (StringUtils.trimToNull(password) != null) { + redisuri.setPassword(password); + } StatefulRedisConnection connection = RedisClient.create(redisuri).connect(new ByteArrayCodec()); diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisClusterIngestionClient.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisClusterIngestionClient.java index 5af6b82d34..049b53fff2 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisClusterIngestionClient.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisClusterIngestionClient.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.joda.time.Duration; public class RedisClusterIngestionClient implements RedisIngestionClient { @@ -50,7 +51,10 @@ public RedisClusterIngestionClient(StoreProto.Store.RedisClusterConfig redisClus RedisURI redisuri = RedisURI.create(hostPortSplit[0], Integer.parseInt(hostPortSplit[1])); if (hostPortSplit.length == 3) { - redisuri.setPassword(hostPortSplit[2]); + String password = hostPortSplit[2]; + if (StringUtils.trimToNull(password) != null) { + redisuri.setPassword(password); + } } return redisuri; }) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java index 1b2e54f79f..4852d9371c 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java @@ -32,6 +32,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.lang3.StringUtils; @AutoValue public abstract class RedisFeatureSink implements FeatureSink { @@ -82,7 +83,11 @@ public abstract static class Builder { public void prepareWrite(FeatureSet featureSet) { if (getRedisConfig() != null) { RedisURI redisuri = RedisURI.create(getRedisConfig().getHost(), getRedisConfig().getPort()); - redisuri.setPassword(getRedisConfig().getPass()); + + String password = getRedisConfig().getPass(); + if (StringUtils.trimToNull(password) != null) { + redisuri.setPassword(password); + } RedisClient redisClient = RedisClient.create(redisuri); try { diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisStandaloneIngestionClient.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisStandaloneIngestionClient.java index 34d9f8686f..74fcc83d80 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisStandaloneIngestionClient.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisStandaloneIngestionClient.java @@ -28,11 +28,12 @@ import io.lettuce.core.codec.ByteArrayCodec; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.joda.time.Duration; public class RedisStandaloneIngestionClient implements RedisIngestionClient { private final String host; - private final String pass; + private final String password; private final int port; private final BackOffExecutor backOffExecutor; private RedisClient redisclient; @@ -44,7 +45,7 @@ public class RedisStandaloneIngestionClient implements RedisIngestionClient { public RedisStandaloneIngestionClient(StoreProto.Store.RedisConfig redisConfig) { this.host = redisConfig.getHost(); this.port = redisConfig.getPort(); - this.pass = redisConfig.getPass(); + this.password = redisConfig.getPass(); long backoffMs = redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1; this.backOffExecutor = new BackOffExecutor(redisConfig.getMaxRetries(), Duration.millis(backoffMs)); @@ -53,7 +54,9 @@ public RedisStandaloneIngestionClient(StoreProto.Store.RedisConfig redisConfig) @Override public void setup() { RedisURI redisuri = new RedisURI(host, port, java.time.Duration.ofMillis(DEFAULT_TIMEOUT)); - redisuri.setPassword(pass); + if (StringUtils.trimToNull(password) != null) { + redisuri.setPassword(password); + } this.redisclient = RedisClient.create(redisuri); }