Skip to content

Commit

Permalink
Remove use of ImmutableOpenMap
Browse files Browse the repository at this point in the history
Signed-off-by: Monu Singh <[email protected]>
  • Loading branch information
monusingh-1 committed May 10, 2023
1 parent 3b43e55 commit 43cde95
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 56 deletions.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@

distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-all.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ class TransportResumeIndexReplicationAction @Inject constructor(transportService
private suspend fun isResumable(params :IndexReplicationParams): Boolean {
var isResumable = true
val remoteClient = client.getRemoteClusterClient(params.leaderAlias)
val shards = clusterService.state().routingTable.indicesRouting().get(params.followerIndexName).shards()
val shards = clusterService.state().routingTable.indicesRouting().get(params.followerIndexName)?.shards()
val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), remoteClient)
shards.forEach {
shards?.forEach {
val followerShardId = it.value.shardId
if (!retentionLeaseHelper.verifyRetentionLeaseExist(ShardId(params.leaderIndex, followerShardId.id), followerShardId)) {
isResumable = false
Expand All @@ -146,7 +146,7 @@ class TransportResumeIndexReplicationAction @Inject constructor(transportService

// clean up all retention leases we may have accidentally took while doing verifyRetentionLeaseExist .
// Idempotent Op which does no harm
shards.forEach {
shards?.forEach {
val followerShardId = it.value.shardId
log.debug("Removing lease for $followerShardId.id ")
retentionLeaseHelper.attemptRetentionLeaseRemoval(ShardId(params.leaderIndex, followerShardId.id), followerShardId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ class TransportUpdateMetadataAction @Inject constructor(
val indexAsArray = arrayOf(concreteIndex)
val aliasMetadata = metadata.findAliases(action, indexAsArray)
val finalAliases: MutableList<String> = ArrayList()
for (curAliases in aliasMetadata.values()) {
for (aliasMeta in curAliases.value) {
for (curAliases in aliasMetadata.values) {
for (aliasMeta in curAliases) {
finalAliases.add(aliasMeta.alias())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import org.opensearch.cluster.block.ClusterBlockException
import org.opensearch.cluster.block.ClusterBlockLevel
import org.opensearch.cluster.block.ClusterBlocks
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.collect.ImmutableOpenMap
import org.opensearch.index.IndexNotFoundException
import org.opensearch.rest.RestStatus
import java.util.*
import java.util.Collections
import java.util.EnumSet


/* This is our custom index block to prevent changes to follower
Expand All @@ -49,11 +49,11 @@ fun checkIfIndexBlockedWithLevel(clusterService: ClusterService,
clusterBlockLevel: ClusterBlockLevel) {
clusterService.state().routingTable.index(indexName) ?:
throw IndexNotFoundException("Index with name:$indexName doesn't exist")
val writeIndexBlockMap : ImmutableOpenMap<String, Set<ClusterBlock>> = clusterService.state().blocks()
val writeIndexBlockMap : Map<String, Set<ClusterBlock>> = clusterService.state().blocks()
.indices(clusterBlockLevel)
if (!writeIndexBlockMap.containsKey(indexName))
return
val clusterBlocksSet : Set<ClusterBlock> = writeIndexBlockMap.get(indexName)
val clusterBlocksSet : Set<ClusterBlock> = writeIndexBlockMap.getOrDefault(indexName, Collections.emptySet())
if (clusterBlocksSet.contains(INDEX_REPLICATION_BLOCK)
&& clusterBlocksSet.size > 1)
throw ClusterBlockException(clusterBlocksSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata
builder.remove(REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING.key)

val indexMdBuilder = IndexMetadata.builder(indexMetadata).settings(builder)
indexMetadata.aliases.valuesIt().forEach {
indexMetadata.aliases.values.forEach {
indexMdBuilder.putAlias(it)
}
return indexMdBuilder.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ class RemoteClusterRetentionLeaseHelper constructor(val followerClusterName: Str
val remoteMetadata = getLeaderIndexMetadata(replMetadata.connectionName, replMetadata.leaderContext.resource)
val params = IndexReplicationParams(replMetadata.connectionName, remoteMetadata.index, followerIndexName)
val remoteClient = client.getRemoteClusterClient(params.leaderAlias)
val shards = clusterService.state().routingTable.indicesRouting().get(params.followerIndexName).shards()
val shards = clusterService.state().routingTable.indicesRouting().get(params.followerIndexName)?.shards()
val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), remoteClient)
shards.forEach {
shards?.forEach {
val followerShardId = it.value.shardId
log.debug("Removing lease for $followerShardId.id ")
retentionLeaseHelper.attemptRetentionLeaseRemoval(ShardId(params.leaderIndex, followerShardId.id), followerShardId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
val clusterState = clusterService.state()
val persistentTasks = clusterState.metadata.custom<PersistentTasksCustomMetadata>(PersistentTasksCustomMetadata.TYPE)

val followerShardIds = clusterService.state().routingTable.indicesRouting().get(followerIndexName).shards()
.map { shard -> shard.value.shardId }
.stream().collect(Collectors.toSet())
val followerShardIds = clusterService.state().routingTable.indicesRouting().get(followerIndexName)?.shards()
?.map { shard -> shard.value.shardId }
?.stream()?.collect(Collectors.toSet()).orEmpty()
val runningShardTasksForIndex = persistentTasks.findTasks(ShardReplicationExecutor.TASK_NAME, Predicate { true }).stream()
.map { task -> task.params as ShardReplicationParams }
.filter {taskParam -> followerShardIds.contains(taskParam.followerShardId) }
Expand Down Expand Up @@ -434,16 +434,16 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
// If we we want to retrieve just the version of settings and alias versions, there are two options
// 1. Include this in GetChanges and communicate it to IndexTask via Metadata
// 2. Add another API to retrieve version of settings & aliases. Persist current version in Metadata
var leaderSettings = settingsResponse.indexToSettings.get(this.leaderIndex.name)
leaderSettings = leaderSettings.filter { k: String? ->
var leaderSettings = settingsResponse.indexToSettings.getOrDefault(this.leaderIndex.name, Settings.EMPTY)
leaderSettings = leaderSettings.filter { k: String ->
!blockListedSettings.contains(k)
}

gsr = GetSettingsRequest().includeDefaults(false).indices(this.followerIndexName)
settingsResponse = client.suspending(client.admin().indices()::getSettings, injectSecurityContext = true)(gsr)
var followerSettings = settingsResponse.indexToSettings.get(this.followerIndexName)
var followerSettings = settingsResponse.indexToSettings.getOrDefault(this.followerIndexName, Settings.EMPTY)

followerSettings = followerSettings.filter { k: String? ->
followerSettings = followerSettings.filter { k: String ->
k != REPLICATED_INDEX_SETTING.key
}

Expand Down Expand Up @@ -516,11 +516,11 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
//Alias
var getAliasesRequest = GetAliasesRequest().indices(this.leaderIndex.name)
var getAliasesRes = remoteClient.suspending(remoteClient.admin().indices()::getAliases, injectSecurityContext = true)(getAliasesRequest)
var leaderAliases = getAliasesRes.aliases.get(this.leaderIndex.name)
var leaderAliases = getAliasesRes.aliases.getOrDefault(this.leaderIndex.name, Collections.emptyList())

getAliasesRequest = GetAliasesRequest().indices(followerIndexName)
getAliasesRes = client.suspending(client.admin().indices()::getAliases, injectSecurityContext = true)(getAliasesRequest)
var followerAliases = getAliasesRes.aliases.get(followerIndexName)
var followerAliases = getAliasesRes.aliases.getOrDefault(followerIndexName, Collections.emptyList())

var request :IndicesAliasesRequest?

Expand Down Expand Up @@ -606,8 +606,8 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript

try {
//Step 1 : Remove the tasks
val shards = clusterService.state().routingTable.indicesRouting().get(followerIndexName).shards()
shards.forEach {
val shards = clusterService.state().routingTable.indicesRouting().get(followerIndexName)?.shards()
shards?.forEach {
persistentTasksService.removeTask(ShardReplicationTask.taskIdForShard(it.value.shardId))
}

Expand Down Expand Up @@ -748,7 +748,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript

suspend fun startNewOrMissingShardTasks(): Map<ShardId, PersistentTask<ShardReplicationParams>> {
assert(clusterService.state().routingTable.hasIndex(followerIndexName)) { "Can't find index $followerIndexName" }
val shards = clusterService.state().routingTable.indicesRouting().get(followerIndexName).shards()
val shards = clusterService.state().routingTable.indicesRouting().get(followerIndexName)?.shards()
val persistentTasks = clusterService.state().metadata.custom<PersistentTasksCustomMetadata>(PersistentTasksCustomMetadata.TYPE)
val runningShardTasks = persistentTasks.findTasks(ShardReplicationExecutor.TASK_NAME, Predicate { true }).stream()
.map { task -> task as PersistentTask<ShardReplicationParams> }
Expand All @@ -757,14 +757,14 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
{t: PersistentTask<ShardReplicationParams> -> t.params!!.followerShardId},
{t: PersistentTask<ShardReplicationParams> -> t}))

val tasks = shards.map {
val tasks = shards?.map {
it.value.shardId
}.associate { shardId ->
}?.associate { shardId ->
val task = runningShardTasks.getOrElse(shardId) {
startReplicationTask(ShardReplicationParams(leaderAlias, ShardId(leaderIndex, shardId.id), shardId))
}
return@associate shardId to task
}
}.orEmpty()

return tasks
}
Expand Down Expand Up @@ -865,9 +865,9 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
This can happen if there was a badly timed cluster manager node failure.""".trimIndent())
}
} else if (restore.state() == RestoreInProgress.State.FAILURE) {
val failureReason = restore.shards().values().find {
it.value.state() == RestoreInProgress.State.FAILURE
}!!.value.reason()
val failureReason = restore.shards().values.find {
it.state() == RestoreInProgress.State.FAILURE
}!!.reason()
return FailedState(Collections.emptyMap(), failureReason)
} else {
return InitFollowState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class SecurityCustomRolesIT: SecurityBase() {
"1",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
.indexToSettings.getOrDefault(followerIndexName, Settings.EMPTY)[IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
)

settings = Settings.builder()
Expand All @@ -242,7 +242,7 @@ class SecurityCustomRolesIT: SecurityBase() {
"checksum",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[followerIndexName]["index.shard.check_on_startup"]
.indexToSettings.getOrDefault(followerIndexName, Settings.EMPTY)["index.shard.check_on_startup"]
)
}, 30L, TimeUnit.SECONDS)
}
Expand Down Expand Up @@ -272,7 +272,7 @@ class SecurityCustomRolesIT: SecurityBase() {
"1",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
.indexToSettings.getOrDefault(followerIndexName, Settings.EMPTY)[IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
)
settings = Settings.builder()
.put("index.shard.check_on_startup", "checksum")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class SecurityDlsFlsIT: SecurityBase() {
"1",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
.indexToSettings.getOrDefault(followerIndexName, Settings.EMPTY)[IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
)
settings = Settings.builder()
.put("index.shard.check_on_startup", "checksum")
Expand Down
Loading

0 comments on commit 43cde95

Please sign in to comment.