Skip to content

Commit

Permalink
Rename the replication settings (#103)
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>

Co-authored-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala and ankitkala authored Aug 18, 2021
1 parent c75ff5a commit 6946805
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 26 deletions.
28 changes: 14 additions & 14 deletions src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,26 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
const val REPLICATION_EXECUTOR_NAME_FOLLOWER = "replication_follower"
val REPLICATED_INDEX_SETTING: Setting<String> = Setting.simpleString("index.plugins.replication.replicated",
Setting.Property.InternalIndex, Setting.Property.IndexScope)
val REPLICATION_CHANGE_BATCH_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.ops_batch_size", 50000, 16,
val REPLICATION_FOLLOWER_OPS_BATCH_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.follower.index.ops_batch_size", 50000, 16,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_LEADER_THREADPOOL_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.leader.size", 0, 0,
val REPLICATION_LEADER_THREADPOOL_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.leader.thread_pool.size", 0, 0,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.leader.queue_size", 1000, 0,
val REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.leader.thread_pool.queue_size", 1000, 0,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE: Setting<ByteSizeValue> = Setting.byteSizeSetting("plugins.replication.index.recovery.chunk_size", ByteSizeValue(10, ByteSizeUnit.MB),
val REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE: Setting<ByteSizeValue> = Setting.byteSizeSetting("plugins.replication.follower.index.recovery.chunk_size", ByteSizeValue(10, ByteSizeUnit.MB),
ByteSizeValue(1, ByteSizeUnit.MB), ByteSizeValue(1, ByteSizeUnit.GB),
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS: Setting<Int> = Setting.intSetting("plugins.replication.index.recovery.max_concurrent_file_chunks", 5, 1,
val REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS: Setting<Int> = Setting.intSetting("plugins.replication.follower.index.recovery.max_concurrent_file_chunks", 5, 1,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_PARALLEL_READ_PER_SHARD = Setting.intSetting("plugins.replication.parallel_reads_per_shard", 2, 1,
val REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD = Setting.intSetting("plugins.replication.follower.concurrent_readers_per_shard", 2, 1,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_PARALLEL_READ_POLL_DURATION = Setting.timeSetting ("plugins.replication.parallel_reads_poll_duration", TimeValue.timeValueMillis(50), TimeValue.timeValueMillis(1),
val REPLICATION_PARALLEL_READ_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.follower.poll_interval", TimeValue.timeValueMillis(50), TimeValue.timeValueMillis(1),
TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_DURATION = Setting.timeSetting ("plugins.replication.autofollow.remote.fetch_poll_duration", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30),
val REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.autofollow.fetch_poll_interval", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30),
TimeValue.timeValueHours(1), Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_DURATION = Setting.timeSetting ("plugins.replication.autofollow.remote.retry_poll_duration", TimeValue.timeValueHours(1), TimeValue.timeValueMinutes(30),
val REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.autofollow.retry_poll_interval", TimeValue.timeValueHours(1), TimeValue.timeValueMinutes(30),
TimeValue.timeValueHours(4), Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_METADATA_SYNC_INTERVAL = Setting.timeSetting("plugins.replication.metadata_sync",
val REPLICATION_METADATA_SYNC_INTERVAL = Setting.timeSetting("plugins.replication.follower.metadata_sync_interval",
TimeValue.timeValueSeconds(60), TimeValue.timeValueSeconds(5),
Setting.Property.Dynamic, Setting.Property.NodeScope)
}
Expand Down Expand Up @@ -308,11 +308,11 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
}

override fun getSettings(): List<Setting<*>> {
return listOf(REPLICATED_INDEX_SETTING, REPLICATION_CHANGE_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE,
REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_PARALLEL_READ_PER_SHARD,
return listOf(REPLICATED_INDEX_SETTING, REPLICATION_FOLLOWER_OPS_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE,
REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD,
REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS,
REPLICATION_PARALLEL_READ_POLL_DURATION, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_DURATION,
REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_DURATION, REPLICATION_METADATA_SYNC_INTERVAL)
REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL,
REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL)
}

override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry,
Expand Down
20 changes: 10 additions & 10 deletions src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ class ReplicationSettings(clusterService: ClusterService) {

@Volatile var chunkSize = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE.get(clusterService.settings)
@Volatile var concurrentFileChunks = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS.get(clusterService.settings)
@Volatile var readersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_PARALLEL_READ_PER_SHARD)
@Volatile var batchSize = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_CHANGE_BATCH_SIZE)
@Volatile var pollDuration: TimeValue = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_DURATION)
@Volatile var autofollowFetchPollDuration = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_DURATION)
@Volatile var autofollowRetryPollDuration = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_DURATION)
@Volatile var readersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD)
@Volatile var batchSize = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE)
@Volatile var pollDuration: TimeValue = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL)
@Volatile var autofollowFetchPollDuration = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL)
@Volatile var autofollowRetryPollDuration = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL)
@Volatile var metadataSyncInterval = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_METADATA_SYNC_INTERVAL)
init {
listenForUpdates(clusterService.clusterSettings)
Expand All @@ -33,11 +33,11 @@ class ReplicationSettings(clusterService: ClusterService) {
private fun listenForUpdates(clusterSettings: ClusterSettings) {
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE) { value: ByteSizeValue -> this.chunkSize = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS) { value: Int -> this.concurrentFileChunks = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_PARALLEL_READ_PER_SHARD) { value: Int -> this.readersPerShard = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_CHANGE_BATCH_SIZE) { batchSize = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_DURATION) { pollDuration = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_DURATION) { autofollowFetchPollDuration = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_DURATION) { autofollowRetryPollDuration = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD) { value: Int -> this.readersPerShard = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE) { batchSize = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL) { pollDuration = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL) { autofollowFetchPollDuration = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL) { autofollowRetryPollDuration = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_METADATA_SYNC_INTERVAL) { metadataSyncInterval = it }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
protected fun setMetadataSyncDelay() {
val followerClient = getClientForCluster(FOLLOWER)
val updateSettingsRequest = ClusterUpdateSettingsRequest()
updateSettingsRequest.transientSettings(Collections.singletonMap<String, String?>("plugins.replication.metadata_sync", "5s"))
updateSettingsRequest.transientSettings(Collections.singletonMap<String, String?>(ReplicationPlugin.REPLICATION_METADATA_SYNC_INTERVAL.key, "5s"))
followerClient.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.junit.Assert
import java.util.Locale
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.replication.ReplicationPlugin
import org.opensearch.test.OpenSearchTestCase.assertBusy
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -93,7 +94,8 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {

try {
// Set poll duration to 30sec from 60sec (default)
val settings = Settings.builder().put("plugins.replication.autofollow.remote.fetch_poll_duration", TimeValue.timeValueSeconds(30))
val settings = Settings.builder().put(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL.key,
TimeValue.timeValueSeconds(30))
val clusterUpdateSetttingsReq = ClusterUpdateSettingsRequest().persistentSettings(settings)
val clusterUpdateResponse = followerClient.cluster().putSettings(clusterUpdateSetttingsReq, RequestOptions.DEFAULT)
Assert.assertTrue(clusterUpdateResponse.isAcknowledged)
Expand Down

0 comments on commit 6946805

Please sign in to comment.