Skip to content

Commit

Permalink
[CCR] Sync mappings between leader and follow index (#30115)
Browse files Browse the repository at this point in the history
The shard changes api returns the minimum IndexMetadata version the leader
index needs to have. If the leader side is behind on IndexMetadata version
then follow shard task waits with processing write operations until the
mapping has been fetched from leader index and applied in follower index
in the background.

The cluster state api is used to fetch the leader mapping and put mapping api
to apply the mapping in the follower index. This works because put mapping
api accepts fields that are already defined.

Relates to #30086
  • Loading branch information
martijnvg committed May 28, 2018
1 parent fe00232 commit 0b22392
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,28 +156,36 @@ public int hashCode() {

public static final class Response extends ActionResponse {

private long indexMetadataVersion;
private Translog.Operation[] operations;

Response() {
}

Response(final Translog.Operation[] operations) {
Response(long indexMetadataVersion, final Translog.Operation[] operations) {
this.indexMetadataVersion = indexMetadataVersion;
this.operations = operations;
}

public long getIndexMetadataVersion() {
return indexMetadataVersion;
}

public Translog.Operation[] getOperations() {
return operations;
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
indexMetadataVersion = in.readVLong();
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(indexMetadataVersion);
out.writeArray(Translog.Operation::writeOperation, operations);
}

Expand All @@ -186,12 +194,16 @@ public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final Response response = (Response) o;
return Arrays.equals(operations, response.operations);
return indexMetadataVersion == response.indexMetadataVersion &&
Arrays.equals(operations, response.operations);
}

@Override
public int hashCode() {
return Arrays.hashCode(operations);
int result = 1;
result += Objects.hashCode(indexMetadataVersion);
result += Arrays.hashCode(operations);
return result;
}
}

Expand Down Expand Up @@ -224,8 +236,11 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());

final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
request.maxSeqNo = Math.min(request.maxSeqNo, indexShard.getGlobalCheckpoint());
return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
final Translog.Operation[] operations =
getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
return new Response(indexMetaDataVersion, operations);
}

@Override
Expand All @@ -250,7 +265,8 @@ protected Response newResponse() {

private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];

static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit) throws IOException {
static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo,
long byteLimit) throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
}
Expand All @@ -266,6 +282,6 @@ static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long
}
}
}
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}
}
Loading

0 comments on commit 0b22392

Please sign in to comment.