Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not optimize append-only operation if normal operation with higher seq# was seen #28787

Merged
merged 10 commits into from
Mar 26, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public class InternalEngine extends Engine {
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
/**
Expand Down Expand Up @@ -189,7 +190,7 @@ public InternalEngine(EngineConfig engineConfig) {
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, logger, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint, startingCommit);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
bootstrapAppendOnlyInfoFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
|| openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG
|| openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
Expand Down Expand Up @@ -364,15 +365,16 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
}
}

private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue());
break;
final String key = entry.getKey();
if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), Long.parseLong(entry.getValue())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question - why the leniency with max?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the max expr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the max is still here for time stamps? maybe assert it's -1 (for both seq# and timestamp)

}
if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
maxSeqNoOfNonAppendOnlyOperations.set(Math.max(maxSeqNoOfNonAppendOnlyOperations.get(), Long.parseLong(entry.getValue())));
}
}
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
}

@Override
Expand Down Expand Up @@ -893,11 +895,24 @@ public IndexResult index(Index index) throws IOException {

private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
final IndexingStrategy plan;
if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) {
// no need to deal with out of order delivery - we never saw this one
final boolean appendOnlyRequest = canOptimizeAddDocument(index);
if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
/*
* As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue
* a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before
* the original append-only. In this case we can't simply proceed with the append only without consulting the version map.
* If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen
* the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
*/
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
} else {
if (appendOnlyRequest == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we introduce a method similar to mayHaveBeenIndexedBefore that does the check and also updates the maxSeqNoOfNonAppendOnlyOperations? I think it's good to have both marker handling consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently @s1monw prefered the reverse. I'm fine with leaving as is.

maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
}
versionMap.enforceSafeAccess();
// drop out of order operations
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
Expand Down Expand Up @@ -1032,6 +1047,11 @@ private boolean mayHaveBeenIndexedBefore(Index index) {
return mayHaveBeenIndexBefore;
}

// for testing
long getMaxSeqNoOfNonAppendOnlyOperations() {
return maxSeqNoOfNonAppendOnlyOperations.get();
}

private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.addDocuments(docs);
Expand Down Expand Up @@ -1187,6 +1207,9 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
+ delete.versionType() + "]";
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
// unlike the primary, replicas don't really care to about found status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4572,4 +4572,92 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup
}
}
}

public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final Path translogPath = createTempDir();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) {
final CountDownLatch latch = new CountDownLatch(1);
final Thread appendOnlyIndexer = new Thread(() -> {
try {
latch.countDown();
final int numDocs = scaledRandomIntBetween(100, 1000);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null);
if (randomBoolean()) {
engine.index(appendOnlyReplica(doc, randomBoolean(), 1, engine.getLocalCheckpointTracker().generateSeqNo()));
} else {
engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong()));
}
}
} catch (Exception ex) {
throw new RuntimeException("Failed to index", ex);
}
});
appendOnlyIndexer.setName("append-only indexer");
appendOnlyIndexer.start();
latch.await();
long maxSeqNoOfNonAppendOnly = SequenceNumbers.NO_OPS_PERFORMED;
final int numOps = scaledRandomIntBetween(100, 1000);
for (int i = 0; i < numOps; i++) {
ParsedDocument parsedDocument = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null);
if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations
final long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean());
if (randomBoolean()) {
engine.index(doc);
} else {
engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(),
doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis()));
}
maxSeqNoOfNonAppendOnly = seqno;
} else { // On primary - do not update max_seqno for non-append-only operations
if (randomBoolean()) {
engine.index(indexForDoc(parsedDocument));
} else {
engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), newUid(parsedDocument.id())));
}
}
}
appendOnlyIndexer.join(120_000);
assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
engine.syncTranslog();
engine.flush();
}
try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) {
assertThat("max_seqno from non-append-only was not bootstrap from the safe commit",
engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(globalCheckpoint.get()));
}
}

public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception {
long lookupTimes = 0L;
final LocalCheckpointTracker localCheckpointTracker = engine.getLocalCheckpointTracker();
final int initDocs = between(0, 10);
for (int i = 0; i < initDocs; i++) {
index(engine, i);
lookupTimes++;
}
// doc1 is delayed and arrived after a non-append-only op.
final long seqNoDoc1 = localCheckpointTracker.generateSeqNo();
Engine.IndexResult regularDoc = engine.index(replicaIndexForDoc(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also test a delete?

testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, localCheckpointTracker.generateSeqNo(), false));
lookupTimes++;
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); //
assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(regularDoc.getSeqNo()));

// should not optimize for doc1 and process as a regular doc (eg. look up in version map)
engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null),
false, randomNonNegativeLong(), seqNoDoc1));
lookupTimes++;
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));

// optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map.
engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, testDocumentWithTextField(), SOURCE, null),
false, randomNonNegativeLong(), localCheckpointTracker.generateSeqNo()));
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkItemRequest;
Expand All @@ -30,11 +31,13 @@
import org.elasticsearch.action.bulk.BulkShardResponse;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkActionTests;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.resync.ResyncReplicationRequest;
import org.elasticsearch.action.resync.ResyncReplicationResponse;
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -609,6 +612,13 @@ private TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardRespo
return result;
}

private <Request extends ReplicatedWriteRequest & DocWriteRequest>
BulkShardRequest executeReplicationRequestOnPrimary(IndexShard primary, Request request) throws Exception {
final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(),
new BulkItemRequest[]{new BulkItemRequest(0, request)});
return executeShardBulkOnPrimary(primary, bulkShardRequest).replicaRequest();
}

private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest request) throws Exception {
final Translog.Location location = TransportShardBulkAction.performOnReplica(request, replica);
TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger);
Expand All @@ -618,13 +628,14 @@ private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest requ
* indexes the given requests on the supplied primary, modifying it for replicas
*/
BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
final BulkItemRequest bulkItemRequest = new BulkItemRequest(0, request);
BulkItemRequest[] bulkItemRequests = new BulkItemRequest[1];
bulkItemRequests[0] = bulkItemRequest;
final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), bulkItemRequests);
final TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> result =
executeShardBulkOnPrimary(primary, bulkShardRequest);
return result.replicaRequest();
return executeReplicationRequestOnPrimary(primary, request);
}

/**
* Executes the delete request on the primary, and modifies it for replicas.
*/
BulkShardRequest deleteOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
return executeReplicationRequestOnPrimary(primary, request);
}

/**
Expand All @@ -634,6 +645,13 @@ void indexOnReplica(BulkShardRequest request, IndexShard replica) throws Excepti
executeShardBulkOnReplica(replica, request);
}

/**
* Executes the delete request on the given replica shard.
*/
void deleteOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
executeShardBulkOnReplica(replica, request);
}

class GlobalCheckpointSync extends ReplicationAction<
GlobalCheckpointSyncAction.Request,
GlobalCheckpointSyncAction.Request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
Expand Down Expand Up @@ -58,7 +60,6 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -368,6 +369,27 @@ public void testSeqNoCollision() throws Exception {
}
}

/**
* This test ensures the consistency between primary and replica when non-append-only (eg. index request with id or delete) operation
* of the same document is processed before the original append-only request on replicas. The append-only document can be exposed and
* deleted on the primary before it is added to replica. Replicas should treat a late append-only request as a regular index request.
*/
public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception {
try (ReplicationGroup shards = createGroup(1)) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
// Append-only request - without id
final BulkShardRequest indexRequest = indexOnPrimary(
new IndexRequest(index.getName(), "type", null).source("{}", XContentType.JSON), primary);
final String docId = Iterables.get(getShardDocUIDs(primary), 0);
final BulkShardRequest deleteRequest = deleteOnPrimary(new DeleteRequest(index.getName(), "type", docId), primary);
deleteOnReplica(deleteRequest, replica);
indexOnReplica(indexRequest, replica);
shards.assertAllEqual(0);
}
}

/** Throws <code>documentFailure</code> on every indexing operation */
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
final String documentFailureMessage;
Expand Down