Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uses auto generated timestamp with soft-deletes #33656

Closed
dnhatn opened this issue Sep 13, 2018 · 5 comments
Closed

Uses auto generated timestamp with soft-deletes #33656

dnhatn opened this issue Sep 13, 2018 · 5 comments
Assignees
Labels
:Distributed Indexing/CCR Issues around the Cross Cluster State Replication features

Comments

@dnhatn
Copy link
Member

dnhatn commented Sep 13, 2018

1. Peer-recovery

Today we don't store the auto-generated timestamp of indexing operations in Lucene and always assign -1 to all index operations from LuceneChangesSnapshot. This looks innocent but it generates duplicate documents on a replica in the following test.

public void testRetryAppendOnlyInRecoveryAndReplication() throws Exception {
    Settings settings = Settings.builder()
        .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
        .build();
    try (ReplicationGroup shards = createGroup(0, settings)) {
        shards.startAll();
        final IndexRequest originalRequest = new IndexRequest(
            index.getName(), "type").source("{}", XContentType.JSON);
        originalRequest.process(Version.CURRENT, null, index.getName());
        IndexRequest retryRequest = new IndexRequest();
        try (BytesStreamOutput out = new BytesStreamOutput()) {
            originalRequest.writeTo(out);
            try (StreamInput in = out.bytes().streamInput()) {
                retryRequest.readFrom(in);
            }
        }
        retryRequest.onRetry();
        shards.index(retryRequest);
        IndexShard replica = shards.addReplica();
        shards.recoverReplica(replica); // timestamp on replica is -1
        shards.assertAllEqual(1);
        shards.index(originalRequest); // we optimize this request on replica
        shards.assertAllEqual(1);
    }
}

To fix this, we need to assign a value which is at least the (original) timestamp of the index request to its corresponding index request from LucenChangeSnapshot. Here we can use the latest auto-generated timestamp of Engine.

2. Optimize indexing on a FollowingEngine in CCR

We disable optimization for index requests whose origin are recovery (retry always is true). To enable this optimization in CCR:

  1. We need to make sure that a FollowingEngine processes an append-only operation once. This can be done using LocalCheckpointTracker.

  2. We need to store the retry flag to Lucene index and extend Translog#Index to include this flag. This should be fast with a single value DocValues.

@s1monw WDYT? /cc @bleskes

This a subtask of #30086.

@dnhatn dnhatn added :Distributed Indexing/CCR Issues around the Cross Cluster State Replication features team-discuss labels Sep 13, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

@s1monw
Copy link
Contributor

s1monw commented Sep 13, 2018

To fix this, we need to assign a value which is at least the (original) timestamp of the index request to its corresponding index request from LucenChangeSnapshot. Here we can use the latest auto-generated timestamp of Engine.

I don't think we can do that. We don't know if the original request had such a timestamp or not. if we assign it to the wrong value we might miss an update. I think we should rather not try to optimize the peer recovery case and make sure we propagate the maxUnsafeAutoIdTimestamp from the primary to the replica before we replay the ops to it. that way we can be sure that if the retry / orig request arrives we deoptimize it and ops that come through peer recovery will be de-optimized.

We disable optimization for index requests whose origin are recovery (retry always is true). To enable this optimization in CCR:

I wonder if we can be smarter here. Today we can only optimize for append-only if the ID was autogenerated. Yet, for the following engine that's a different game. At the moment we fetch docs from the leader we could (if we had the information what was the highest seqID that actually did a delete or update in the engine) optimize also documents that have non-autogenerated IDs with append only. We can then use the seqID as the timestamp. Once we see the leader tells us that there is a seqID X that caused an update / delete we mark all ops with seqId <= X as retry. This will cause the right behavior for us. We also need to make sure that if we fetch docs a second time we mark them as retries. Lemme know if I am missing something.

@dnhatn
Copy link
Member Author

dnhatn commented Sep 13, 2018

I think we should rather not try to optimize the peer recovery case and make sure we propagate the maxUnsafeAutoIdTimestamp from the primary to the replica before we replay the ops to it.

Yes, I am good with this since we don't have to worry about the correctness.

At the moment we fetch docs from the leader we could (if we had the information what was the highest seqID that actually did a delete or update in the engine) optimize also documents that have non-autogenerated IDs with append only.

Wow, superb idea. Thanks so much @s1monw.

@dnhatn
Copy link
Member Author

dnhatn commented Sep 18, 2018

We will implement an optimization using sequence number on the following engine with these sub-tasks:

dnhatn added a commit that referenced this issue Sep 20, 2018
This change adds "contains" method to LocalCheckpointTracker.
One of the use cases is to check if a given operation has been processed
in an engine or not by looking up its seq_no in LocalCheckpointTracker.

Relates #33656
dnhatn added a commit that referenced this issue Sep 20, 2018
This change adds "contains" method to LocalCheckpointTracker.
One of the use cases is to check if a given operation has been processed
in an engine or not by looking up its seq_no in LocalCheckpointTracker.

Relates #33656
dnhatn added a commit that referenced this issue Sep 20, 2018
Today we don't store the auto-generated timestamp of append-only
operations in Lucene; and assign -1 to every index operations
constructed from LuceneChangesSnapshot. This looks innocent but it
generates duplicate documents on a replica if a retry append-only
arrives first via peer-recovery; then an original append-only arrives
via replication. Since the retry append-only (delivered via recovery)
does not have timestamp, the replica will happily optimizes the original
request while it should not.

This change transmits the max auto-generated timestamp from the primary
to replicas before translog phase in peer recovery. This timestamp will
prevent replicas from optimizing append-only requests if retry
counterparts have been processed.

Relates #33656 
Relates #33222
dnhatn added a commit that referenced this issue Sep 22, 2018
This PR is the first step to use seq_no to optimize indexing operations.
The idea is to track the max seq_no of either update or delete ops on a
primary, and transfer this information to replicas, and replicas use it
to optimize indexing plan for index operations (with assigned seq_no).

The max_seq_no_of_updates on primary is initialized once when a primary
finishes its local recovery or peer recovery in relocation or being
promoted. After that, the max_seq_no_of_updates is only advanced internally
inside an engine when processing update or delete operations.

Relates #33656
dnhatn added a commit to dnhatn/elasticsearch that referenced this issue Sep 22, 2018
We start tracking max seq_no_of_updates on the primary in elastic#33842. This
commit replicates that value from a primary to its replicas in
replication requests or the translog phase of peer-recovery.

With this change, we guarantee that the value of max seq_no_of_updates
on a replica when any index/delete operation is performed at least the
max_seq_no_of_updates on the primary when that operation was executed.

Relates elastic#33656
dnhatn added a commit to dnhatn/elasticsearch that referenced this issue Sep 23, 2018
Today we don't store the auto-generated timestamp of append-only
operations in Lucene; and assign -1 to every index operations
constructed from LuceneChangesSnapshot. This looks innocent but it
generates duplicate documents on a replica if a retry append-only
arrives first via peer-recovery; then an original append-only arrives
via replication. Since the retry append-only (delivered via recovery)
does not have timestamp, the replica will happily optimizes the original
request while it should not.

This change transmits the max auto-generated timestamp from the primary
to replicas before translog phase in peer recovery. This timestamp will
prevent replicas from optimizing append-only requests if retry
counterparts have been processed.

Relates elastic#33656
Relates elastic#33222
dnhatn added a commit that referenced this issue Sep 25, 2018
We start tracking max seq_no_of_updates on the primary in #33842. This
commit replicates that value from a primary to its replicas in replication 
requests or the translog phase of peer-recovery.

With this change, we guarantee that the value of max seq_no_of_updates
on a replica when any index/delete operation is performed at least the
max_seq_no_of_updates on the primary when that operation was executed.

Relates #33656
dnhatn added a commit to dnhatn/elasticsearch that referenced this issue Sep 25, 2018
This commit replicates the max_seq_no_of_updates on the leading index to
the primaries of the following index via ShardFollowNodeTask. The
max_seq_of_updates is then transmitted to the replicas of the follower
via replication requests (that's BulkShardOperationsRequest).

Relates elastic#33656
dnhatn added a commit that referenced this issue Sep 26, 2018
This commit replicates the max_seq_no_of_updates on the leading index
to the primaries of the following index via ShardFollowNodeTask. The
max_seq_of_updates is then transmitted to the replicas of the follower
via replication requests (that's BulkShardOperationsRequest).

Relates #33656
dnhatn added a commit that referenced this issue Sep 27, 2018
This PR is the first step to use seq_no to optimize indexing operations.
The idea is to track the max seq_no of either update or delete ops on a
primary, and transfer this information to replicas, and replicas use it
to optimize indexing plan for index operations (with assigned seq_no).

The max_seq_no_of_updates on primary is initialized once when a primary
finishes its local recovery or peer recovery in relocation or being
promoted. After that, the max_seq_no_of_updates is only advanced internally
inside an engine when processing update or delete operations.

Relates #33656
dnhatn added a commit that referenced this issue Sep 27, 2018
We start tracking max seq_no_of_updates on the primary in #33842. This
commit replicates that value from a primary to its replicas in replication
requests or the translog phase of peer-recovery.

With this change, we guarantee that the value of max seq_no_of_updates
on a replica when any index/delete operation is performed at least the
max_seq_no_of_updates on the primary when that operation was executed.

Relates #33656
dnhatn added a commit that referenced this issue Sep 27, 2018
This commit replicates the max_seq_no_of_updates on the leading index
to the primaries of the following index via ShardFollowNodeTask. The
max_seq_of_updates is then transmitted to the replicas of the follower
via replication requests (that's BulkShardOperationsRequest).

Relates #33656
dnhatn added a commit that referenced this issue Sep 29, 2018
This change introduces the indexing optimization using sequence numbers
in the FollowingEngine. This optimization uses the max_seq_no_updates
which is tracked on the primary of the leader and replicated to replicas
and followers.

Relates #33656
dnhatn added a commit that referenced this issue Sep 29, 2018
This change introduces the indexing optimization using sequence numbers
in the FollowingEngine. This optimization uses the max_seq_no_updates
which is tracked on the primary of the leader and replicated to replicas
and followers.

Relates #33656
@dnhatn dnhatn closed this as completed Oct 4, 2018
@dnhatn
Copy link
Member Author

dnhatn commented Oct 4, 2018

All subtasks have been integrated.

kcm pushed a commit that referenced this issue Oct 30, 2018
This change adds "contains" method to LocalCheckpointTracker.
One of the use cases is to check if a given operation has been processed
in an engine or not by looking up its seq_no in LocalCheckpointTracker.

Relates #33656
kcm pushed a commit that referenced this issue Oct 30, 2018
Today we don't store the auto-generated timestamp of append-only
operations in Lucene; and assign -1 to every index operations
constructed from LuceneChangesSnapshot. This looks innocent but it
generates duplicate documents on a replica if a retry append-only
arrives first via peer-recovery; then an original append-only arrives
via replication. Since the retry append-only (delivered via recovery)
does not have timestamp, the replica will happily optimizes the original
request while it should not.

This change transmits the max auto-generated timestamp from the primary
to replicas before translog phase in peer recovery. This timestamp will
prevent replicas from optimizing append-only requests if retry
counterparts have been processed.

Relates #33656 
Relates #33222
kcm pushed a commit that referenced this issue Oct 30, 2018
This PR is the first step to use seq_no to optimize indexing operations.
The idea is to track the max seq_no of either update or delete ops on a
primary, and transfer this information to replicas, and replicas use it
to optimize indexing plan for index operations (with assigned seq_no).

The max_seq_no_of_updates on primary is initialized once when a primary
finishes its local recovery or peer recovery in relocation or being
promoted. After that, the max_seq_no_of_updates is only advanced internally
inside an engine when processing update or delete operations.

Relates #33656
kcm pushed a commit that referenced this issue Oct 30, 2018
We start tracking max seq_no_of_updates on the primary in #33842. This
commit replicates that value from a primary to its replicas in replication 
requests or the translog phase of peer-recovery.

With this change, we guarantee that the value of max seq_no_of_updates
on a replica when any index/delete operation is performed at least the
max_seq_no_of_updates on the primary when that operation was executed.

Relates #33656
kcm pushed a commit that referenced this issue Oct 30, 2018
This commit replicates the max_seq_no_of_updates on the leading index
to the primaries of the following index via ShardFollowNodeTask. The
max_seq_of_updates is then transmitted to the replicas of the follower
via replication requests (that's BulkShardOperationsRequest).

Relates #33656
kcm pushed a commit that referenced this issue Oct 30, 2018
This change introduces the indexing optimization using sequence numbers
in the FollowingEngine. This optimization uses the max_seq_no_updates
which is tracked on the primary of the leader and replicated to replicas
and followers.

Relates #33656
dnhatn added a commit that referenced this issue Nov 7, 2018
A CCR test failure shows that the approach in #34474 is flawed.
Restoring the LocalCheckpointTracker from an index commit can cause both
FollowingEngine and InternalEngine to incorrectly ignore some deletes.

Here is a small scenario illustrating the problem:

1. Delete doc with seq=1 => engine will add a delete tombstone to Lucene

2. Flush a commit consisting of only the delete tombstone

3. Index doc with seq=0  => engine will add that doc to Lucene but soft-deleted

4. Restart an engine with the commit (step 2); the engine will fill its
LocalCheckpointTracker with the delete tombstone in the commit

5. Replay the local translog in reverse order: index#0 then delete#1

6. When process index#0, an engine will add it into Lucene as a live doc
and advance the local checkpoint to 1 (seq#1 was restored from the
commit - step 4).

7. When process delete#1, an engine will skip it because seq_no=1 is
less than or equal to the local checkpoint.

We should have zero document after recovering from translog, but here we
have one.

Since all operations after the local checkpoint of the safe commit are
retained, we should find them if the look-up considers also soft-deleted
documents. This PR fills the disparity between the version map and the
local checkpoint tracker by taking soft-deleted documents into account
while resolving strategy for engine operations.

Relates #34474
Relates #33656
dnhatn added a commit that referenced this issue Nov 8, 2018
A CCR test failure shows that the approach in #34474 is flawed.
Restoring the LocalCheckpointTracker from an index commit can cause both
FollowingEngine and InternalEngine to incorrectly ignore some deletes.

Here is a small scenario illustrating the problem:

1. Delete doc with seq=1 => engine will add a delete tombstone to Lucene

2. Flush a commit consisting of only the delete tombstone

3. Index doc with seq=0  => engine will add that doc to Lucene but soft-deleted

4. Restart an engine with the commit (step 2); the engine will fill its
LocalCheckpointTracker with the delete tombstone in the commit

5. Replay the local translog in reverse order: index#0 then delete#1

6. When process index#0, an engine will add it into Lucene as a live doc
and advance the local checkpoint to 1 (seq#1 was restored from the
commit - step 4).

7. When process delete#1, an engine will skip it because seq_no=1 is
less than or equal to the local checkpoint.

We should have zero document after recovering from translog, but here we
have one.

Since all operations after the local checkpoint of the safe commit are
retained, we should find them if the look-up considers also soft-deleted
documents. This PR fills the disparity between the version map and the
local checkpoint tracker by taking soft-deleted documents into account
while resolving strategy for engine operations.

Relates #34474
Relates #33656
dnhatn added a commit that referenced this issue Nov 8, 2018
A CCR test failure shows that the approach in #34474 is flawed.
Restoring the LocalCheckpointTracker from an index commit can cause both
FollowingEngine and InternalEngine to incorrectly ignore some deletes.

Here is a small scenario illustrating the problem:

1. Delete doc with seq=1 => engine will add a delete tombstone to Lucene

2. Flush a commit consisting of only the delete tombstone

3. Index doc with seq=0  => engine will add that doc to Lucene but soft-deleted

4. Restart an engine with the commit (step 2); the engine will fill its
LocalCheckpointTracker with the delete tombstone in the commit

5. Replay the local translog in reverse order: index#0 then delete#1

6. When process index#0, an engine will add it into Lucene as a live doc
and advance the local checkpoint to 1 (seq#1 was restored from the
commit - step 4).

7. When process delete#1, an engine will skip it because seq_no=1 is
less than or equal to the local checkpoint.

We should have zero document after recovering from translog, but here we
have one.

Since all operations after the local checkpoint of the safe commit are
retained, we should find them if the look-up considers also soft-deleted
documents. This PR fills the disparity between the version map and the
local checkpoint tracker by taking soft-deleted documents into account
while resolving strategy for engine operations.

Relates #34474
Relates #33656
pgomulka pushed a commit to pgomulka/elasticsearch that referenced this issue Nov 13, 2018
…ic#35230)

A CCR test failure shows that the approach in elastic#34474 is flawed.
Restoring the LocalCheckpointTracker from an index commit can cause both
FollowingEngine and InternalEngine to incorrectly ignore some deletes.

Here is a small scenario illustrating the problem:

1. Delete doc with seq=1 => engine will add a delete tombstone to Lucene

2. Flush a commit consisting of only the delete tombstone

3. Index doc with seq=0  => engine will add that doc to Lucene but soft-deleted

4. Restart an engine with the commit (step 2); the engine will fill its
LocalCheckpointTracker with the delete tombstone in the commit

5. Replay the local translog in reverse order: index#0 then delete#1

6. When process index#0, an engine will add it into Lucene as a live doc
and advance the local checkpoint to 1 (seq#1 was restored from the
commit - step 4).

7. When process delete#1, an engine will skip it because seq_no=1 is
less than or equal to the local checkpoint.

We should have zero document after recovering from translog, but here we
have one.

Since all operations after the local checkpoint of the safe commit are
retained, we should find them if the look-up considers also soft-deleted
documents. This PR fills the disparity between the version map and the
local checkpoint tracker by taking soft-deleted documents into account
while resolving strategy for engine operations.

Relates elastic#34474
Relates elastic#33656
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/CCR Issues around the Cross Cluster State Replication features
Projects
None yet
Development

No branches or pull requests

3 participants