Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce cross-cluster replication #30086

Closed
27 of 29 tasks
elasticmachine opened this issue Sep 5, 2017 · 6 comments · Fixed by #39722
Closed
27 of 29 tasks

Introduce cross-cluster replication #30086

elasticmachine opened this issue Sep 5, 2017 · 6 comments · Fixed by #39722
Assignees
Labels
:Distributed Indexing/CCR Issues around the Cross Cluster State Replication features Meta

Comments

@elasticmachine
Copy link
Collaborator

elasticmachine commented Sep 5, 2017

Issue description

Original comment by @jasontedor:

The goal of cross-cluster replication is to enable users to replicate operations from one cluster to another cluster via active-passive replication. There are three drivers for providing CCR functionality in X-Pack:

  • resiliency in case the primary cluster fails, a secondary cluster can serve as a hot backup
  • geo-proximity so that reads can be served locally
  • ECE can use the infrastructure for a UI to replicate data from one cluster to another

The purpose of this meta-issue is to serve as a high-level plan for the initial implementation.

Our initial implementation will build a shard-to-shard pull-based model without automatic setup built on the transport layer.

In this model, shards on a following index are responsible for pulling from shards on the leader index. We chose this model because:

  • this model has simpler state management than a push-based model (each follower has one leader, but not vice-versa)
  • recovery during failure is simpler as the follower knows how far it is in the sequence of operations
  • operations can be streamed from any copy of the leader shard (the primary shard, or any of its replicas)

As far as automatic setup, for now users will have to manually set up a following index to a leader index, and would have to use on bootstrapping existing data via snapshot/restore. This does not necessarily mean that we will not have something more sophisticated when the first version ships, only that for the initial implementation we will not look at anything else. We can consider automatic setup (for example, for time-based indices) and remote recovery infrastructure later.

Utilizing the transport layer allows us to reuse existing infrastructure, and we can follow the path blazed by cross-cluster search for reading from remote clusters.

Basic infrastructure:

  • API to serve operations from a given sequence number
    • needs to fetch the sequence number
    • respond with batch of requested size to client
    • only return documents below the global checkpoint
  • persistent task for the following index to pull data from the leader index
  • a following engine implementation that can index operations that already have a sequence number (from the leader)
    • the engine should reject operations that do not have a sequence number
    • the engine will add a primary term
    • this engine will not close gaps in histories upon recoveries, that is expected to be done from the leader
    • we need to carefully consider the tradeoff of having the engine open for pluggability versus a key component of the CCR infrastructure being in open source
  • implement a mechanism to transfer index metadata changes (e.g., mapping) to a follower from the leader
    • could be done inline with the shard operation stream
    • alternatively, the shard operation stream could indicate the minimum index metadata version required and the following index could wait (through an observer) until the local version catches up
  • CCR REST API for users to set up remote replication
    • design API
    • implement API

Things to do and investigate:

@elasticmachine elasticmachine added :Distributed Indexing/CCR Issues around the Cross Cluster State Replication features Meta labels Apr 25, 2018
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Apr 25, 2018
The shard changes api returns the minimum IndexMetadata version the leader
index needs to have. If the leader side is behind on IndexMetadata version
then follow shard task waits with processing write operations until the
mapping has been fetched from leader index and applied in follower index
in the background.

The cluster state api is used to fetch the leader mapping and put mapping api
to apply the mapping in the follower index. This works because put mapping
api accepts fields that are already defined.

Relates to elastic#30086
@martijnvg martijnvg added :Distributed Indexing/CCR Issues around the Cross Cluster State Replication features and removed :Distributed Indexing/CCR Issues around the Cross Cluster State Replication features labels Apr 26, 2018
@elasticmachine
Copy link
Collaborator Author

Pinging @elastic/es-distributed

dnhatn pushed a commit that referenced this issue May 9, 2018
This commit adds an API to read translog snapshot from Lucene,
then cut-over from the existing translog to the new API in CCR.

Relates #30086
Relates #29530
dnhatn pushed a commit that referenced this issue May 10, 2018
This commit adds an API to read translog snapshot from Lucene,
then cut-over from the existing translog to the new API in CCR.

Relates #30086
Relates #29530
martijnvg added a commit that referenced this issue May 28, 2018
The shard changes api returns the minimum IndexMetadata version the leader
index needs to have. If the leader side is behind on IndexMetadata version
then follow shard task waits with processing write operations until the
mapping has been fetched from leader index and applied in follower index
in the background.

The cluster state api is used to fetch the leader mapping and put mapping api
to apply the mapping in the follower index. This works because put mapping
api accepts fields that are already defined.

Relates to #30086
martijnvg added a commit that referenced this issue May 28, 2018
The shard changes api returns the minimum IndexMetadata version the leader
index needs to have. If the leader side is behind on IndexMetadata version
then follow shard task waits with processing write operations until the
mapping has been fetched from leader index and applied in follower index
in the background.

The cluster state api is used to fetch the leader mapping and put mapping api
to apply the mapping in the follower index. This works because put mapping
api accepts fields that are already defined.

Relates to #30086
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Jun 5, 2018
* A single ChunksCoordinator is now in charge of following a shard and
keeps on coordinating until the persistent task has been stopped. Whereas
before a ChunksCoordinator's job was to process a finite amount of chunks
and then a new ChunksCoordinator instance would process the next chunks.
* Instead of consuming the chunks queue and waiting for all workers to
complete, another background thread will continuously and chunks to the
queue, so that the workers never run out of chunks to process if the
leader shard has unprocessed write operations.

Relates to elastic#30086
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Jun 14, 2018
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Jun 27, 2018
The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind).
The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure
other than reducing the concurrent reads from the leader shard.

This PR has the following changes:
* Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner.
  This allows for better unit testing and makes it easier to add stats.
* All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api.
  This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads
  will be performed until the number of ops is below that limit.
* The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process;
  instead of relying on a background thread to fetch the leader shard's global checkpoint.
* Reading write operations from the leader shard (via shard changes api) is a seperate step then writing the write operations (via bulk shards operations api).
  Whereas before a read would immediately result into a write.
* The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written.
* Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask.
* Moved over the changes from elastic#31242 to make shard follow mechanism resilient from node and shard failures.

Relates to elastic#30086
dnhatn added a commit that referenced this issue Aug 31, 2018
This PR integrates Lucene soft-deletes(LUCENE-8200) into Elasticsearch.
Highlight works in this PR include:

- Replace hard-deletes by soft-deletes in InternalEngine
- Use _recovery_source if _source is disabled or modified (#31106)
- Soft-deletes retention policy based on the global checkpoint (#30335)
- Read operation history from Lucene instead of translog (#30120)
- Use Lucene history in peer-recovery (#30522)

Relates #30086
Closes #29530

---
These works have been done by the whole team; however, these individuals
(lexical order) have significant contribution in coding and reviewing:

Co-authored-by: Adrien Grand [email protected]
Co-authored-by: Boaz Leskes [email protected]
Co-authored-by: Jason Tedor [email protected]
Co-authored-by: Martijn van Groningen [email protected]
Co-authored-by: Nhat Nguyen [email protected]
Co-authored-by: Simon Willnauer [email protected]
dnhatn added a commit to dnhatn/elasticsearch that referenced this issue Aug 31, 2018
This PR integrates Lucene soft-deletes(LUCENE-8200) into Elasticsearch.
Highlight works in this PR include:

- Replace hard-deletes by soft-deletes in InternalEngine
- Use _recovery_source if _source is disabled or modified (elastic#31106)
- Soft-deletes retention policy based on the global checkpoint (elastic#30335)
- Read operation history from Lucene instead of translog (elastic#30120)
- Use Lucene history in peer-recovery (elastic#30522)

Relates elastic#30086
Closes elastic#29530

---
These works have been done by the whole team; however, these individuals
(lexical order) have significant contribution in coding and reviewing:

Co-authored-by: Adrien Grand <[email protected]>
Co-authored-by: Boaz Leskes <[email protected]>
Co-authored-by: Jason Tedor <[email protected]>
Co-authored-by: Martijn van Groningen <[email protected]>
Co-authored-by: Nhat Nguyen <[email protected]>
Co-authored-by: Simon Willnauer <[email protected]>
dnhatn added a commit that referenced this issue Aug 31, 2018
This PR integrates Lucene soft-deletes(LUCENE-8200) into Elasticsearch.
Highlight works in this PR include:

- Replace hard-deletes by soft-deletes in InternalEngine
- Use _recovery_source if _source is disabled or modified (#31106)
- Soft-deletes retention policy based on the global checkpoint (#30335)
- Read operation history from Lucene instead of translog (#30120)
- Use Lucene history in peer-recovery (#30522)

Relates #30086
Closes #29530

---
These works have been done by the whole team; however, these individuals
(lexical order) have significant contribution in coding and reviewing:

Co-authored-by: Adrien Grand <[email protected]>
Co-authored-by: Boaz Leskes <[email protected]>
Co-authored-by: Jason Tedor <[email protected]>
Co-authored-by: Martijn van Groningen <[email protected]>
Co-authored-by: Nhat Nguyen <[email protected]>
Co-authored-by: Simon Willnauer <[email protected]>
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Sep 4, 2018
Improve failure handling of retryable errors by retrying remote calls in
a exponential backoff like manner. The delay between a retry would not be
longer than the configured max retry delay. Also retryable errors will be
retried indefinitely.

Relates to elastic#30086
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Sep 8, 2018
For correctness we need to verify whether the history uuid of the leader
index shards never changes while that index is being followed.

* The history UUIDs are recorded as custom index metadata in the follow index.
* The follow api validates whether the current history UUIDs of the leader
  index shards are the same as the recorded history UUIDs.
  If not the follow api fails.
* While a follow index is following a leader index; shard follow tasks
  on each shard changes api call verify whether their current history uuid
  is the same as the recorded history uuid.

Relates to elastic#30086
martijnvg added a commit that referenced this issue Sep 12, 2018
Improve failure handling of retryable errors by retrying remote calls in
a exponential backoff like manner. The delay between a retry would not be
longer than the configured max retry delay. Also retryable errors will be
retried indefinitely.

Relates to #30086
martijnvg added a commit that referenced this issue Sep 12, 2018
Improve failure handling of retryable errors by retrying remote calls in
a exponential backoff like manner. The delay between a retry would not be
longer than the configured max retry delay. Also retryable errors will be
retried indefinitely.

Relates to #30086
martijnvg added a commit that referenced this issue Sep 12, 2018
For correctness we need to verify whether the history uuid of the leader
index shards never changes while that index is being followed.

* The history UUIDs are recorded as custom index metadata in the follow index.
* The follow api validates whether the current history UUIDs of the leader
  index shards are the same as the recorded history UUIDs.
  If not the follow api fails.
* While a follow index is following a leader index; shard follow tasks
  on each shard changes api call verify whether their current history uuid
  is the same as the recorded history uuid.

Relates to #30086 

Co-authored-by: Nhat Nguyen <[email protected]>
martijnvg added a commit that referenced this issue Sep 12, 2018
For correctness we need to verify whether the history uuid of the leader
index shards never changes while that index is being followed.

* The history UUIDs are recorded as custom index metadata in the follow index.
* The follow api validates whether the current history UUIDs of the leader
  index shards are the same as the recorded history UUIDs.
  If not the follow api fails.
* While a follow index is following a leader index; shard follow tasks
  on each shard changes api call verify whether their current history uuid
  is the same as the recorded history uuid.

Relates to #30086 

Co-authored-by: Nhat Nguyen <[email protected]>
ycombinator added a commit that referenced this issue Sep 12, 2018
Follow up to #33617. Relates to #30086.

As with all other per-index Monitoring collectors, the `CcrStatsCollector` should only collect stats for the indices the user wants to monitor. This list is controlled by the `xpack.monitoring.collection.indices` setting and defaults to all indices.
ycombinator added a commit to ycombinator/elasticsearch that referenced this issue Sep 18, 2018
)

Follow up to elastic#33617. Relates to elastic#30086.

As with all other per-index Monitoring collectors, the `CcrStatsCollector` should only collect stats for the indices the user wants to monitor. This list is controlled by the `xpack.monitoring.collection.indices` setting and defaults to all indices.
ycombinator added a commit that referenced this issue Sep 18, 2018
Backport of #33646 to 6.x. Original message:

Follow up to #33617. Relates to #30086.

As with all other per-index Monitoring collectors, the `CcrStatsCollector` should only collect stats for the indices the user wants to monitor. This list is controlled by the `xpack.monitoring.collection.indices` setting and defaults to all indices.
dnhatn added a commit that referenced this issue Jan 24, 2019
Today, the mapping on the follower is managed and replicated from its
leader index by the ShardFollowTask. Thus, we should prevent users
from modifying the mapping on the follower indices.

Relates #30086
dnhatn added a commit that referenced this issue Jan 28, 2019
Today, the mapping on the follower is managed and replicated from its
leader index by the ShardFollowTask. Thus, we should prevent users
from modifying the mapping on the follower indices.

Relates #30086
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/CCR Issues around the Cross Cluster State Replication features Meta
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants