Skip to content

Commit

Permalink
Added support for start replication block (#251)
Browse files Browse the repository at this point in the history
Signed-off-by: Sai Kumar <karanas@amazon.com>
  • Loading branch information
saikaranam-amazon authored Nov 14, 2021
1 parent 2dc458d commit 53ffede
Showing 7 changed files with 93 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -184,6 +184,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION = Setting.timeSetting ("plugins.replication.follower.retention_lease_max_failure_duration", TimeValue.timeValueHours(1), TimeValue.timeValueSeconds(1),
TimeValue.timeValueHours(12), Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_FOLLOWER_BLOCK_START: Setting<Boolean> = Setting.boolSetting("plugins.replication.follower.block.start", false,
Setting.Property.Dynamic, Setting.Property.NodeScope)
}

override fun createComponents(client: Client, clusterService: ClusterService, threadPool: ThreadPool,
@@ -340,7 +342,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS,
REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL,
REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL,
REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION)
REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_FOLLOWER_BLOCK_START)
}

override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry,
Original file line number Diff line number Diff line change
@@ -30,6 +30,8 @@ open class ReplicationSettings(clusterService: ClusterService) {
@Volatile var autofollowRetryPollDuration = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL)
@Volatile var metadataSyncInterval = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_METADATA_SYNC_INTERVAL)
@Volatile var leaseRenewalMaxFailureDuration: TimeValue = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION)
@Volatile var followerBlockStart: Boolean = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START)

init {
listenForUpdates(clusterService.clusterSettings)
}
@@ -44,5 +46,6 @@ open class ReplicationSettings(clusterService: ClusterService) {
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL) { autofollowFetchPollDuration = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL) { autofollowRetryPollDuration = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_METADATA_SYNC_INTERVAL) { metadataSyncInterval = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START) { followerBlockStart = it }
}
}
Original file line number Diff line number Diff line change
@@ -57,10 +57,10 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
}

override fun doExecute(task: Task, request: ReplicateIndexRequest, listener: ActionListener<ReplicateIndexResponse>) {
log.info("Setting-up replication for ${request.leaderAlias}:${request.leaderIndex} -> ${request.followerIndex}")
val user = SecurityContext.fromSecurityThreadContext(threadPool.threadContext)
launch(threadPool.coroutineContext()) {
listener.completeWith {
log.info("Setting-up replication for ${request.leaderAlias}:${request.leaderIndex} -> ${request.followerIndex}")
val user = SecurityContext.fromSecurityThreadContext(threadPool.threadContext)

val followerReplContext = ReplicationContext(request.followerIndex,
user?.overrideFgacRole(request.useRoles?.get(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE)))
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.IndicesOptions
@@ -43,7 +44,9 @@ import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.settings.IndexScopedSettings
import org.opensearch.index.IndexNotFoundException
import org.opensearch.persistent.PersistentTasksService
import org.opensearch.replication.ReplicationPlugin
import org.opensearch.repositories.RepositoriesService
import org.opensearch.rest.RestStatus
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
import java.io.IOException
@@ -87,6 +90,12 @@ class TransportReplicateIndexMasterNodeAction @Inject constructor(transportServi
// for each shard. If that takes too long we can start the task asynchronously and return the response first.
launch(Dispatchers.Unconfined + threadPool.coroutineContext()) {
try {
if(clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START)) {
log.debug("Replication cannot be started as " +
"start block(${ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START}) is set")
throw OpenSearchStatusException("[FORBIDDEN] Replication START block is set", RestStatus.FORBIDDEN)
}

val remoteMetadata = getRemoteIndexMetadata(replicateIndexReq.leaderAlias, replicateIndexReq.leaderIndex)

if (state.routingTable.hasIndex(replicateIndexReq.followerIndex)) {
11 changes: 11 additions & 0 deletions src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ import org.opensearch.common.xcontent.XContentType
import org.opensearch.test.OpenSearchTestCase.assertBusy
import org.opensearch.test.rest.OpenSearchRestTestCase
import org.junit.Assert
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors

@@ -369,3 +370,13 @@ fun RestHighLevelClient.deleteAutoFollowPattern(connection: String, patternName:
val response = getAckResponse(lowLevelResponse)
assertThat(response.isAcknowledged).isTrue()
}

fun RestHighLevelClient.updateReplicationStartBlockSetting(enabled: Boolean) {
var settings: Settings = Settings.builder()
.put("plugins.replication.follower.block.start", enabled)
.build()
var updateSettingsRequest = ClusterUpdateSettingsRequest()
updateSettingsRequest.persistentSettings(settings)
val response = this.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT)
assertThat(response.isAcknowledged).isTrue()
}
Original file line number Diff line number Diff line change
@@ -70,6 +70,7 @@ import org.opensearch.replication.followerStats
import org.opensearch.replication.leaderStats
import org.opensearch.replication.task.index.IndexReplicationExecutor.Companion.log
import java.lang.Thread.sleep
import org.opensearch.replication.updateReplicationStartBlockSetting
import java.nio.file.Files
import java.util.concurrent.TimeUnit

@@ -1121,6 +1122,36 @@ class StartReplicationIT: MultiClusterRestTestCase() {
"Value .leaderIndex must not start with '.'")
}

fun `test that replication is not started when start block is set`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(
CreateIndexRequest(leaderIndexName),
RequestOptions.DEFAULT
)
assertThat(createIndexResponse.isAcknowledged).isTrue()

// Setting to add replication start block
followerClient.updateReplicationStartBlockSetting(true)

assertThatThrownBy { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName),
waitForRestore = true) }
.isInstanceOf(ResponseException::class.java)
.hasMessageContaining("[FORBIDDEN] Replication START block is set")

// Remove replication start block and start replication
followerClient.updateReplicationStartBlockSetting(false)

try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName),
waitForRestore = true)
} finally {
followerClient.stopReplication(followerIndexName)
}
}

private fun assertValidationFailure(client: RestHighLevelClient, leader: String, follower: String, errrorMsg: String) {
assertThatThrownBy {
client.startReplication(StartReplicationRequest("source", leader, follower))
Original file line number Diff line number Diff line change
@@ -41,8 +41,10 @@ import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.replication.AutoFollowStats
import org.opensearch.replication.ReplicationPlugin
import org.opensearch.replication.updateReplicationStartBlockSetting
import org.opensearch.replication.waitForShardTaskStart
import org.opensearch.test.OpenSearchTestCase.assertBusy
import java.lang.Thread.sleep
import java.util.HashMap
import java.util.concurrent.TimeUnit

@@ -335,6 +337,38 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
followerClient.stopReplication(leaderIndexName)
}

fun `test autofollow task with start replication block`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias)

val leaderIndexName = createRandomIndex(leaderClient)
try {

// Add replication start block
followerClient.updateReplicationStartBlockSetting(true)
followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern)
sleep(30000) // Default poll for auto follow in worst case

// verify both index replication tasks and autofollow tasks
// Replication shouldn't have been started - 0 tasks
// Autofollow task should still be up - 1 task
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(0)
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)

// Remove replication start block
followerClient.updateReplicationStartBlockSetting(false)
sleep(45000) // poll for auto follow in worst case

// Index should be replicated and autofollow task should be present
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1)
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)
} finally {
followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName)
followerClient.stopReplication(leaderIndexName)
}
}

fun createRandomIndex(client: RestHighLevelClient): String {
val indexName = indexPrefix + randomAlphaOfLength(6).toLowerCase(Locale.ROOT)
val createIndexResponse = client.indices().create(CreateIndexRequest(indexName), RequestOptions.DEFAULT)

0 comments on commit 53ffede

Please sign in to comment.