Skip to content

Commit

Permalink
[NO_REVIEW] JAMES-4074 Support Redis Sentinel password
Browse files Browse the repository at this point in the history
  • Loading branch information
hung phan committed Sep 29, 2024
1 parent c2dc9e8 commit e631d83
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import com.google.common.base.{MoreObjects, Preconditions}
import eu.timepit.refined
import eu.timepit.refined.api.Refined
import eu.timepit.refined.collection.NonEmpty
import io.lettuce.core.{ReadFrom, RedisURI}
import io.lettuce.core.{ReadFrom, RedisCredentials, RedisCredentialsProvider, RedisURI}
import org.apache.commons.configuration2.Configuration
import org.apache.james.backends.redis.RedisConfiguration.{CLUSTER_TOPOLOGY, MASTER_REPLICA_TOPOLOGY, REDIS_READ_FROM_DEFAULT_VALUE, REDIS_READ_FROM_PROPERTY_NAME, SENTINEL_TOPOLOGY, STANDALONE_TOPOLOGY}
import org.apache.james.backends.redis.RedisConfiguration.{CLUSTER_TOPOLOGY, MASTER_REPLICA_TOPOLOGY, REDIS_READ_FROM_DEFAULT_VALUE, REDIS_READ_FROM_PROPERTY_NAME, REDIS_SENTINEL_PASSWORD, SENTINEL_TOPOLOGY, STANDALONE_TOPOLOGY}
import org.apache.james.backends.redis.RedisUris.{REDIS_URL_PROPERTY_NAME, RedisUris}
import org.slf4j.{Logger, LoggerFactory}

object RedisConfiguration {
val REDIS_READ_FROM_PROPERTY_NAME = "redis.readFrom"
val REDIS_SENTINEL_PASSWORD = "redis.sentinelPassword"
val STANDALONE_TOPOLOGY = "standalone"
val CLUSTER_TOPOLOGY = "cluster"
val MASTER_REPLICA_TOPOLOGY = "master-replica"
Expand Down Expand Up @@ -168,13 +169,17 @@ object SentinelRedisConfiguration {
def from(config: Configuration): SentinelRedisConfiguration = from(
config.getStringArray(REDIS_URL_PROPERTY_NAME).mkString(","),
Option(config.getString(REDIS_READ_FROM_PROPERTY_NAME, null)).map(ReadFrom.valueOf).getOrElse(REDIS_READ_FROM_DEFAULT_VALUE),
Option(config.getString(REDIS_SENTINEL_PASSWORD, null)),
RedisConfiguration.redisIoThreadsFrom(config),
RedisConfiguration.redisWorkerThreadsFrom(config))

def from(redisUri: String, readFrom: ReadFrom): SentinelRedisConfiguration = from(redisUri, readFrom, None, None)
def from(redisUri: String, readFrom: ReadFrom, sentinelPassword: String): SentinelRedisConfiguration = from(redisUri, readFrom, Option.apply(sentinelPassword), None, None)

def from(redisUri: String, readFrom: ReadFrom, ioThreads: Option[Int] = None, workerThreads: Option[Int] = None): SentinelRedisConfiguration =
SentinelRedisConfiguration(RedisURI.create(redisUri), readFrom, ioThreads, workerThreads)
def from(redisUriString: String, readFrom: ReadFrom, maybeSentinelPassword: Option[String], ioThreads: Option[Int] = None, workerThreads: Option[Int] = None): SentinelRedisConfiguration = {
val redisURI = RedisURI.create(redisUriString)
maybeSentinelPassword.foreach(password => redisURI.getSentinels.forEach(uri => uri.setCredentialsProvider(RedisCredentialsProvider.from(() => RedisCredentials.just("", password)))))
SentinelRedisConfiguration(redisURI, readFrom, ioThreads, workerThreads)
}
}

case class SentinelRedisConfiguration(redisURI: RedisURI, readFrom: ReadFrom, ioThreads: Option[Int], workerThreads: Option[Int]) extends RedisConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ package org.apache.james.backends.redis

import java.time.Duration

import io.lettuce.core.{ReadFrom, RedisClient, RedisURI}
import io.lettuce.core.api.reactive.RedisReactiveCommands
import io.lettuce.core.cluster.RedisClusterClient
import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands
import io.lettuce.core.codec.StringCodec
import io.lettuce.core.{RedisClient, RedisURI}
import jakarta.annotation.PreDestroy
import jakarta.inject.Inject
import org.apache.commons.lang3.StringUtils
Expand All @@ -51,7 +51,7 @@ class RedisHealthCheck @Inject()(redisConfiguration: RedisConfiguration) extends
case clusterConfiguration: ClusterRedisConfiguration => new RedisClusterHealthCheckPerform(clusterConfiguration, healthcheckTimeout)
case masterReplicaConfiguration: MasterReplicaRedisConfiguration => new RedisMasterReplicaHealthCheckPerform(masterReplicaConfiguration, healthcheckTimeout)
case sentinelRedisConfiguration: SentinelRedisConfiguration =>
new RedisSentinelHealthCheckPerform(sentinelRedisConfiguration.redisURI, sentinelRedisConfiguration.readFrom, healthcheckTimeout)
new RedisSentinelHealthCheckPerform(sentinelRedisConfiguration.redisURI, healthcheckTimeout)
case _ => throw new NotImplementedError()
}

Expand Down Expand Up @@ -158,18 +158,20 @@ class RedisMasterReplicaHealthCheckPerform(val redisConfiguration: MasterReplica
}

class RedisSentinelHealthCheckPerform(val redisURI: RedisURI,
val readFrom: ReadFrom,
val healthcheckTimeout: Duration) extends RedisHealthcheckPerform {

private val PING_SUCCESS_RESPONSE = "PONG"

private val redisClient: RedisClient = {
private val redisClient: RedisClient = RedisClient.create

private val redisCommand: RedisReactiveCommands[String, String] = {
redisURI.setTimeout(healthcheckTimeout)
RedisClient.create(redisURI)
io.lettuce.core.masterreplica.MasterReplica.connect(redisClient,
StringCodec.UTF8,
redisURI)
.reactive()
}

private val redisCommand: RedisReactiveCommands[String, String] = redisClient.connect().reactive()

override def check(): SMono[Result] =
SMono(redisCommand.ping())
.timeout(healthcheckTimeout.toScala)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ class RedisConfigurationTest extends AnyFlatSpec with Matchers {
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
config.addProperty("redisURL", "redis-sentinel://secret1@redis-sentinel-1:26379?sentinelMasterId=mymaster")
config.addProperty("redis.topology", "sentinel")
config.addProperty("redis.sentinelPassword", "sentinelpass")

val redisConfig: RedisConfiguration = RedisConfiguration.from(config)
redisConfig.isInstanceOf[SentinelRedisConfiguration] shouldEqual (true)
val redisConfiguration = redisConfig.asInstanceOf[SentinelRedisConfiguration]

redisConfiguration.redisURI.toString shouldEqual "redis-sentinel://*******@redis-sentinel-1?sentinelMasterId=mymaster"
redisConfiguration.redisURI.getSentinels.get(0).getCredentialsProvider.resolveCredentials().block().getPassword shouldEqual "sentinelpass".toCharArray
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
/** **************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
* ************************************************************** */

package org.apache.james.backends.redis

import java.util.concurrent.TimeUnit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

public class RedisSentinelExtension implements GuiceModuleTestExtension {
public static final int SENTINEL_PORT = 26379;
public static final String SENTINEL_PASSWORD = "321";

public static class RedisMasterReplicaContainerList extends ArrayList<GenericContainer> {
public RedisMasterReplicaContainerList(Collection<? extends GenericContainer> c) {
Expand Down Expand Up @@ -81,7 +82,23 @@ public RedisSentinelContainerList(Collection<? extends GenericContainer> c) {

public SentinelRedisConfiguration getRedisConfiguration() {
return SentinelRedisConfiguration.from(createRedisSentinelURI(this),
ReadFrom.MASTER);
ReadFrom.MASTER,
SENTINEL_PASSWORD);
}

public void pauseFirstNode() {
GenericContainer container = this.get(0);
container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec();
}

public void unPauseFirstNode() {
GenericContainer container = this.get(0);
if (container.getDockerClient().inspectContainerCmd(container.getContainerId())
.exec()
.getState()
.getPaused()) {
container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec();
}
}
}

Expand All @@ -94,7 +111,7 @@ public record RedisSentinelCluster(RedisMasterReplicaContainerList redisMasterRe
.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd.withName("james-" + alias + "-test-" + UUID.randomUUID()))
.withCommand(Optional.of(isSlave).filter(aBoolean -> aBoolean)
.map(aBoolean -> "redis-server --appendonly yes --port 6379 --slaveof redis1 6379 --requirepass 1 --masterauth 1")
.orElse("redis-server --appendonly yes --port 6379 --requirepass 1"))
.orElse("redis-server --appendonly yes --port 6379 --requirepass 1 --masterauth 1"))
.withNetworkAliases(alias)
.waitingFor(Wait.forLogMessage(".*Ready to accept connections.*", 1)
.withStartupTimeout(Duration.ofMinutes(2)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.james.backends.redis

import org.apache.james.backends.redis.RedisSentinelExtension.RedisSentinelCluster
import org.apache.james.server.core.filesystem.FileSystemImpl
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{AfterEach, BeforeEach}

Expand Down
3 changes: 2 additions & 1 deletion backends-common/redis/src/test/resources/sentinel.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ sentinel monitor mymaster redis1 6379 2
sentinel auth-pass mymaster 1
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 10000
sentinel parallel-syncs mymaster 1
sentinel parallel-syncs mymaster 1
requirepass "321"
3 changes: 3 additions & 0 deletions docs/modules/servers/partials/configure/redis.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ Reference: https://github.com/redis/lettuce/wiki/Redis-URI-and-connection-detail

Reference: https://github.com/redis/lettuce/wiki/ReadFrom-Settings

| redis.sentinelPassword
| Redis sentinel password. If not specified, the redis driver will not set password for sentinel.

| redis.ioThreads
| IO threads to be using for the underlying Netty networking resources. If unspecified driver defaults applies.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.james.rate.limiter.redis

import java.time.Duration

import com.google.common.collect.ImmutableList
import com.google.inject.{AbstractModule, Provides, Scopes}
import es.moki.ratelimitj.core.limiter.request.{AbstractRequestRateLimiterFactory, ReactiveRequestRateLimiter, RequestLimitRule}
import es.moki.ratelimitj.redis.request.{RedisClusterRateLimiterFactory, RedisSlidingWindowRequestRateLimiter, RedisRateLimiterFactory => RedisSingleInstanceRateLimitjFactory}
Expand Down Expand Up @@ -67,7 +68,11 @@ class RedisRateLimiterFactory @Inject()(redisConfiguration: RedisConfiguration)
masterReplicaRedisConfiguration.redisURI.value.asJava,
masterReplicaRedisConfiguration.readFrom)

case sentinelRedisConfiguration: SentinelRedisConfiguration => new RedisSingleInstanceRateLimitjFactory(RedisClient.create(sentinelRedisConfiguration.redisURI))
case sentinelRedisConfiguration: SentinelRedisConfiguration =>
new RedisMasterReplicaRateLimiterFactory(RedisClient.create(),
ImmutableList.of(sentinelRedisConfiguration.redisURI),
sentinelRedisConfiguration.readFrom
)

case _ => throw new NotImplementedError()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.james.rate.limiter.api._
import org.apache.james.rate.limiter.redis.RedisRateLimiterFactory
import org.assertj.core.api.Assertions.{assertThat, assertThatCode}
import org.awaitility.Awaitility
import org.junit.jupiter.api.{AfterEach, Disabled, Test}
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{AfterEach, Test}
import reactor.core.scala.publisher.SMono

object RedisRateLimiterWithSentinelTest {
Expand All @@ -45,6 +45,7 @@ class RedisRateLimiterWithSentinelTest {

@AfterEach
def afterEach(redisClusterContainer: RedisSentinelCluster): Unit = {
redisClusterContainer.redisSentinelContainerList().unPauseFirstNode()
redisClusterContainer.redisMasterReplicaContainerList.unPauseMasterNode()
}

Expand All @@ -65,7 +66,6 @@ class RedisRateLimiterWithSentinelTest {
}

@Test
@Disabled
def rateLimitShouldWorkNormallyAfterFailoverComplete(redisClusterContainer: RedisSentinelCluster): Unit = {
val rateLimiterFactory: RedisRateLimiterFactory = new RedisRateLimiterFactory(redisClusterContainer.redisSentinelContainerList.getRedisConfiguration)
val rateLimiter = rateLimiterFactory.withSpecification(RULES, SLIDING_WIDOW_PRECISION)
Expand All @@ -74,6 +74,8 @@ class RedisRateLimiterWithSentinelTest {
assertThat(SMono(rateLimiter.rateLimit(TestKey("key" + UUID.randomUUID().toString), 5)).block())
.isEqualTo(RateExceeded)

// Pause first sentinel node
redisClusterContainer.redisSentinelContainerList().pauseFirstNode()
// Give stop redis-master node
redisClusterContainer.redisMasterReplicaContainerList.pauseMasterNode()
// Sleep for a while to let sentinel detect the failover. Here is 5 seconds
Expand All @@ -82,7 +84,7 @@ class RedisRateLimiterWithSentinelTest {
// After failover, the rate limit should be working normally
Awaitility.await()
.pollInterval(2, TimeUnit.SECONDS)
.atMost(20, TimeUnit.SECONDS)
.atMost(100, TimeUnit.SECONDS)
.untilAsserted(() => assertThatCode(() => SMono(rateLimiter.rateLimit(TestKey("key" + UUID.randomUUID().toString), 1)).block())
.doesNotThrowAnyException())

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
/** **************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
* ************************************************************** */

package org.apache.james.rate.limiter

import java.time.Duration
Expand Down

0 comments on commit e631d83

Please sign in to comment.