Skip to content

Commit

Permalink
[CCR] Sync mappings between leader and follow index
Browse files Browse the repository at this point in the history
The shard changes api returns the minimum IndexMetadata version the leader
index needs to have. If the leader side is behind on IndexMetadata version
then follow shard task waits with processing write operations until the
mapping has been fetched from leader index and applied in follower index
in the background.

The cluster state api is used to fetch the leader mapping and put mapping api
to apply the mapping in the follower index. This works because put mapping
api accepts fields that are already defined.

Relates to elastic#30086
  • Loading branch information
martijnvg committed Apr 25, 2018
1 parent b126ffa commit 4330bf6
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public void testFollowIndex() throws Exception {
Settings indexSettings = Settings.builder()
.put("index.xpack.ccr.following_index", true)
.build();
// TODO: remove mapping here when ccr syncs mappings too
createIndex(followIndexName, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"integer\" }}}");
createIndex(followIndexName, indexSettings);
ensureYellow(followIndexName);

followIndex("leader_cluster:" + leaderIndexName, followIndexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -159,28 +160,36 @@ public int hashCode() {

public static final class Response extends ActionResponse {

private long indexMetadataVersion;
private Translog.Operation[] operations;

Response() {
}

Response(final Translog.Operation[] operations) {
Response(long indexMetadataVersion, final Translog.Operation[] operations) {
this.indexMetadataVersion = indexMetadataVersion;
this.operations = operations;
}

public long getIndexMetadataVersion() {
return indexMetadataVersion;
}

public Translog.Operation[] getOperations() {
return operations;
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
indexMetadataVersion = in.readVLong();
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(indexMetadataVersion);
out.writeArray(Translog.Operation::writeOperation, operations);
}

Expand All @@ -189,12 +198,16 @@ public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final Response response = (Response) o;
return Arrays.equals(operations, response.operations);
return indexMetadataVersion == response.indexMetadataVersion &&
Arrays.equals(operations, response.operations);
}

@Override
public int hashCode() {
return Arrays.hashCode(operations);
int result = 1;
result += Objects.hashCode(indexMetadataVersion);
result += Arrays.hashCode(operations);
return result;
}
}

Expand Down Expand Up @@ -227,7 +240,8 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());

return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes, indexMetaData);
}

@Override
Expand All @@ -252,7 +266,8 @@ protected Response newResponse() {

private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];

static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit) throws IOException {
static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit,
IndexMetaData indexMetaData) throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
}
Expand All @@ -276,17 +291,17 @@ static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long
seenBytes += orderedOp.estimateSize();
operations.add(orderedOp);
if (nextExpectedSeqNo > maxSeqNo) {
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
}
} else {
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
}
}
}
}

if (nextExpectedSeqNo >= maxSeqNo) {
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
} else {
String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo +
"] found, tracker checkpoint [" + nextExpectedSeqNo + "]";
Expand Down
Loading

0 comments on commit 4330bf6

Please sign in to comment.