Skip to content

Commit

Permalink
Sync the mapping from leader index and retry for MapperParsingExcepti…
Browse files Browse the repository at this point in the history
…on (#411) (#414) (#423)

Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala authored Jun 3, 2022
1 parent 680212b commit 829338a
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.IndexingPressureService
import org.opensearch.index.engine.Engine
import org.opensearch.index.mapper.MapperParsingException
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
Expand Down Expand Up @@ -119,8 +120,9 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans
if(primaryShard.maxSeqNoOfUpdatesOrDeletes < request.maxSeqNoOfUpdatesOrDeletes) {
primaryShard.advanceMaxSeqNoOfUpdatesOrDeletes(request.maxSeqNoOfUpdatesOrDeletes)
}

var result = primaryShard.applyTranslogOperation(op, Engine.Operation.Origin.PRIMARY)
if (result.resultType == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
if (shouldSyncMappingAndRetry(result)) {
waitForMappingUpdate {
// fetch mappings from the leader cluster when applying on PRIMARY...
syncRemoteMapping(request.leaderAlias, request.leaderIndex, request.shardId()!!.indexName)
Expand All @@ -133,6 +135,15 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans
val response = ReplayChangesResponse() // TODO: Figure out what to add to response
return WritePrimaryResult(request, response, location, null, primaryShard, log)
}
fun shouldSyncMappingAndRetry(result: Engine.Result): Boolean {
/*
1. Incase the doc index requires a mapping update, we get the result as MAPPING_UPDATE_REQUIRED.
2. If the dynamic mapping is set to strict, IndexShard will simply reject the applyTranslogOperation operation
as expected. This can happen if user has already updated the mapping on leader but its not present on the follower yet.
So in both case, we sync the mapping from leader index and retry the applyTranslogOperation.
*/
return result.resultType == Engine.Result.Type.MAPPING_UPDATE_REQUIRED || result.failure is MapperParsingException
}

/**
* This requires duplicating the code above due to mapping updates being asynchronous.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.opensearch.replication.task.shard

import org.assertj.core.api.Assertions
import org.junit.Assert
import org.opensearch.action.DocWriteResponse
import org.opensearch.action.get.GetRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.core.CountRequest
import org.opensearch.client.indices.PutMappingRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.replication.FOLL
import org.opensearch.replication.LEADER
import org.opensearch.replication.MultiClusterAnnotations
import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.`validate status syncing response`
import org.opensearch.replication.replicationStatus
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import java.util.Locale
import java.util.concurrent.TimeUnit

const val LEADER = "leaderCluster"
const val FOLL = "followCluster"

@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLL)
)
class TransportReplayChangesActionIT : MultiClusterRestTestCase() {
fun `test strict dynamic mapping update`() {
val follower = getClientForCluster(FOLL)
val leader = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLL, LEADER)
// Create a leader/follower index
val leaderIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)
val followerIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)

try {
val doc1 = mapOf("name" to randomAlphaOfLength(20))
// Create Leader Index
val response = leader.index(IndexRequest(leaderIndex).id("1").source(doc1), RequestOptions.DEFAULT)
Assertions.assertThat(response.result)
.withFailMessage("Failed to create leader data").isEqualTo(DocWriteResponse.Result.CREATED)

// Setup Mapping on leader
var putMappingRequest = PutMappingRequest(leaderIndex)
putMappingRequest.source(
"{\"dynamic\":\"strict\",\"properties\":{\"name\":{\"type\":\"text\"}}}",
XContentType.JSON
)
leader.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT)

// Start replication
follower.startReplication(
StartReplicationRequest("source", leaderIndex, followerIndex),
waitForRestore = true
)
assertBusy {
val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT)
Assertions.assertThat(getResponse.isExists).isTrue()
Assertions.assertThat(getResponse.sourceAsMap).isEqualTo(doc1)
}

// Add a new field in mapping.
putMappingRequest = PutMappingRequest(leaderIndex)
putMappingRequest.source(
"{\"dynamic\":\"strict\",\"properties\":{\"name\":{\"type\":\"text\"},\"place\":{\"type\":\"text\"}}}",
XContentType.JSON
)
leader.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT)

// Ingest a doc on the leader
val doc2 = mapOf("name" to randomAlphaOfLength(5), "place" to randomAlphaOfLength(5))
leader.index(IndexRequest(leaderIndex).id("2").source(doc2), RequestOptions.DEFAULT)

// Verify that replication is working as expected.
assertBusy ({
Assert.assertEquals(leader.count(CountRequest(leaderIndex), RequestOptions.DEFAULT).toString(),
follower.count(CountRequest(followerIndex), RequestOptions.DEFAULT).toString())
`validate status syncing response`(follower.replicationStatus(followerIndex))
val getResponse = follower.get(GetRequest(followerIndex, "2"), RequestOptions.DEFAULT)
Assertions.assertThat(getResponse.isExists).isTrue()
Assertions.assertThat(getResponse.sourceAsMap).isEqualTo(doc2)
},
30, TimeUnit.SECONDS
)

} finally {
follower.stopReplication(followerIndex)
}

}
}

0 comments on commit 829338a

Please sign in to comment.