Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Read changes from Lucene instead of translog #30120

Merged
merged 24 commits into from
May 9, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
Expand Down Expand Up @@ -609,6 +610,12 @@ public Translog.Location getTranslogLastWriteLocation() {
return getTranslog().getLastWriteLocation();
}

/**
* Creates a new "translog" snapshot from Lucene for reading operations whose seqno in the requesting seqno range
*/
public abstract Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException;

protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
Expand Down Expand Up @@ -148,6 +149,7 @@ public class InternalEngine extends Engine {
private final CounterMetric numDocUpdates = new CounterMetric();
private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField();
private final boolean softDeleteEnabled;
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;

/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
Expand Down Expand Up @@ -224,6 +226,8 @@ public InternalEngine(EngineConfig engineConfig) {
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
this.internalSearcherManager.addListener(listener);
}
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
success = true;
} finally {
if (success == false) {
Expand Down Expand Up @@ -2342,6 +2346,17 @@ long getNumDocUpdates() {
return numDocUpdates.count();
}

public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException {
// TODO: Should we defer the refresh until we really need it?
ensureOpen();
if (lastRefreshedCheckpoint() < maxSeqNo) {
refresh(source, SearcherScope.INTERNAL);
}
return new LuceneChangesSnapshot(acquireSearcher(source, SearcherScope.INTERNAL), mapperService,
minSeqNo, maxSeqNo, requiredFullRange);
}

@Override
public boolean isRecovering() {
return pendingTranslogRecovery.get();
Expand Down Expand Up @@ -2388,4 +2403,28 @@ public long softUpdateDocuments(Term term, Iterable<? extends Iterable<? extends
return super.softUpdateDocuments(term, docs, softDeletes);
}
}

/**
* Returned the last local checkpoint value has been refreshed internally.
*/
final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}
private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
final AtomicLong refreshedCheckpoint;
private long pendingCheckpoint;
LastRefreshedCheckpointListener(long initialLocalCheckpoint) {
this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint);
}
@Override
public void beforeRefresh() {
pendingCheckpoint = localCheckpointTracker.getCheckpoint(); // All change until this point should be visible after refresh
}
@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
refreshedCheckpoint.set(pendingCheckpoint);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.translog.Translog;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

/**
* A {@link Translog.Snapshot} from changes in a Lucene index
*/
final class LuceneChangesSnapshot implements Translog.Snapshot {
private final long fromSeqNo, toSeqNo;
private long lastSeenSeqNo;
private int skippedOperations;
private final boolean requiredFullRange;

private final IndexSearcher indexSearcher;
private final MapperService mapperService;
private int docIndex;
private final TopDocs topDocs;

private final Closeable onClose;

/**
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
*
* @param engineSearcher the internal engine searcher - this snapshot will take over the provided searcher
* @param mapperService the mapper service which will be mainly used to resolve the document's type and uid
* @param fromSeqNo the min requesting seq# - inclusive
* @param toSeqNo the maximum requesting seq# - inclusive
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
*/
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
this.mapperService = mapperService;
this.fromSeqNo = fromSeqNo;
this.toSeqNo = toSeqNo;
this.lastSeenSeqNo = fromSeqNo - 1;
this.requiredFullRange = requiredFullRange;
boolean success = false;
try {
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
this.indexSearcher.setQueryCache(null);
this.topDocs = searchOperations(indexSearcher);
this.onClose = engineSearcher;
success = true;
} finally {
if (success == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be handled on the caller side? We are not responsible for this reference to engine searcher unless fully constructued?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

IOUtils.close(engineSearcher);
}
}
}

@Override
public void close() throws IOException {
onClose.close();
}

@Override
public int totalOperations() {
return Math.toIntExact(topDocs.totalHits);
}

@Override
public int overriddenOperations() {
return skippedOperations;
}

@Override
public Translog.Operation next() throws IOException {
final Translog.Operation op = nextOp();
if (requiredFullRange && lastSeenSeqNo != toSeqNo) {
final long expectedSeqNo = lastSeenSeqNo + 1;
if (op == null || op.seqNo() != expectedSeqNo) {
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " +
"and max_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
}
}
if (op != null) {
assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() : "Unexpected operation; " +
"last_seen_seqno [" + lastSeenSeqNo + "], from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "], op [" + op + "]";
lastSeenSeqNo = op.seqNo();
}
return op;
}

private Translog.Operation nextOp() throws IOException {
final ScoreDoc[] scoreDocs = topDocs.scoreDocs;
for (; docIndex < scoreDocs.length; docIndex++) {
if (scoreDocs[docIndex].doc == DocIdSetIterator.NO_MORE_DOCS) {
return null;
}
final Translog.Operation op = readDocAsOp(scoreDocs[docIndex].doc);
if (op != null) {
return op;
}
}
return null;
}

private TopDocs searchOperations(IndexSearcher searcher) throws IOException {
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
final Sort sortedBySeqNoThenByTerm = new Sort(
new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG),
new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed - this should be needed in the future. Maybe we should remove it and instead assert that we never have duplicate seq#

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I miss something here because I think we need it for now but not in the future after we have a Lucene rollback. I will reach out to discuss this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry but I dropped a not in my comment. "this should not be needed in the future." . It's only relevant in cases where the primary dies while indexing is ongoing and we have more than 1 replica. In these cases this primary sort doesn't help because you also need some kind of a deduping mechanism to realy make it work. Such deduping is fairly easy to implement but I'm on the fence to whether we should.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have dedup in this PR already (line 161-163). The lastSeenSeqNo is used for dedup and range check. I am fine to remove the primary sort and dedup mechanism.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I missed it. I think it's surprising to put it in readDocAsOp and shortcut. I'd prefer to do it in next where do all our state updates and then everything together. it's rare anyway and doesn't require optimization imo. That said, it's all nits. If you prefer it otherwise I'm good. Thanks for clarifying.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we should not mutate anything in readDocAsOp. I will update this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes I moved this to next but we also need to dudup for nested docs then I moved this to readDocAsOp again. I think we should optimize for nested docs. I am open to suggestions here.

);
return searcher.search(rangeQuery, Integer.MAX_VALUE, sortedBySeqNoThenByTerm);
}

private Translog.Operation readDocAsOp(int docID) throws IOException {
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
final LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docID, leaves));
final int segmentDocID = docID - leaf.docBase;
final long seqNo = readNumericDV(leaf, SeqNoFieldMapper.NAME, segmentDocID);
// This operation has seen and will be skipped anyway - do not visit other fields.
if (seqNo == lastSeenSeqNo) {
skippedOperations++;
return null;
}

final long primaryTerm = readNumericDV(leaf, SeqNoFieldMapper.PRIMARY_TERM_NAME, segmentDocID);
final FieldsVisitor fields = new FieldsVisitor(true);
indexSearcher.doc(docID, fields);
fields.postProcess(mapperService);

final Translog.Operation op;
final boolean isTombstone = isTombstoneOperation(leaf, segmentDocID);
if (isTombstone && fields.uid() == null) {
op = new Translog.NoOp(seqNo, primaryTerm, ""); // TODO: store reason in ignored fields?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to say yes? It's very rare and it feels like a good debugging tool. I wonder what other people think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make it in a follow-up.

assert readNumericDV(leaf, Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1
: "Noop operation but soft_deletes field is not set [" + op + "]";
} else {
final String id = fields.uid().id();
final String type = fields.uid().type();
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
final long version = readNumericDV(leaf, VersionFieldMapper.NAME, segmentDocID);
if (isTombstone) {
op = new Translog.Delete(type, id, uid, seqNo, primaryTerm, version, VersionType.INTERNAL);
assert readNumericDV(leaf, Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1
: "Delete operation but soft_deletes field is not set [" + op + "]";
} else {
final BytesReference source = fields.source();
// TODO: pass the latest timestamp from engine.
final long autoGeneratedIdTimestamp = -1;
op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL,
source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp);
}
}
return op;
}

private boolean isTombstoneOperation(LeafReaderContext leaf, int segmentDocID) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should just take a LeafReader?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if we want to pull the tombstoneDV in the ctor next to List<LeafReaderContext> leaves and a List<NumericDocValues> for seqIds... I think this would be nice and prevent getting stuff from the reader over and over again.

final NumericDocValues tombstoneDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can pull all these in the constructor into an array that we can access by index of the leaf reader. this is how we do things in lucene for stuff we access frequently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw
I tried but realized that NumericDocValues#advanceExact method requires increasing docID values but it's not the case here. Do you have any suggestion for this?

  /** Advance the iterator to exactly {@code target} and return whether
   *  {@code target} has a value.
   *  {@code target} must be greater than or equal to the current
   *  {@link #docID() doc ID} and must be a valid doc ID, ie. &ge; 0 and
   *  &lt; {@code maxDoc}.
   *  After this method returns, {@link #docID()} retuns {@code target}. */
  public abstract boolean advanceExact(int target) throws IOException;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I need to reset the DV :)

if (tombstoneDV != null && tombstoneDV.advanceExact(segmentDocID)) {
return tombstoneDV.longValue() == 1;
}
return false;
}

private long readNumericDV(LeafReaderContext leaf, String field, int segmentDocID) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should just take a LeafReader?

final NumericDocValues dv = leaf.reader().getNumericDocValues(field);
if (dv == null || dv.advanceExact(segmentDocID) == false) {
throw new IllegalStateException("DocValues for field [" + field + "] is not found");
}
return dv.longValue();
}
}
22 changes: 17 additions & 5 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1595,11 +1595,7 @@ public Closeable acquireTranslogRetentionLock() {
* The caller has to close the returned snapshot after finishing the reading.
*/
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE);
}

public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
return getEngine().newTranslogSnapshotBetween(minSeqNo, maxSeqNo);
return getEngine().newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE);
}

/**
Expand All @@ -1609,6 +1605,22 @@ public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getEngine().estimateTranslogOperationsFromMinSeq(minSeqNo);
}

/**
* Creates a new "translog" snapshot from Lucene for reading operations whose seqno is between minSeqNo and maxSeqNo.
* The caller has to close the returned snapshot after finishing the reading.
*
* @param source the source of the request
* @param minSeqNo the min_seqno to read - inclusive
* @param maxSeqNo the max_seqno to read - inclusive
* @param requiredFullRange if true then {@link Translog.Snapshot#next()} will throw {@link IllegalStateException}
* if any operation between minSeqNo and maxSeqNo is missing. This parameter should be only
* enabled when the requesting range is below the global checkpoint.
*/
public Translog.Snapshot newLuceneChangesSnapshot(String source, long minSeqNo, long maxSeqNo,
boolean requiredFullRange) throws IOException {
return getEngine().newLuceneChangesSnapshot(source, mapperService, minSeqNo, maxSeqNo, requiredFullRange);
}

public List<Segment> segments(boolean verbose) {
return getEngine().segments(verbose);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1545,42 +1545,6 @@ public void testConcurrentOutOfOrderDocsOnReplica() throws IOException, Interrup
assertVisibleCount(engine, totalExpectedOps);
}

private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
Thread[] thread = new Thread[randomIntBetween(3, 5)];
CountDownLatch startGun = new CountDownLatch(thread.length);
AtomicInteger offset = new AtomicInteger(-1);
for (int i = 0; i < thread.length; i++) {
thread[i] = new Thread(() -> {
startGun.countDown();
try {
startGun.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
int docOffset;
while ((docOffset = offset.incrementAndGet()) < ops.size()) {
try {
final Engine.Operation op = ops.get(docOffset);
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
} else {
engine.delete((Engine.Delete) op);
}
if ((docOffset + 1) % 4 == 0) {
engine.refresh("test");
}
} catch (IOException e) {
throw new AssertionError(e);
}
}
});
thread[i].start();
}
for (int i = 0; i < thread.length; i++) {
thread[i].join();
}
}

public void testInternalVersioningOnPrimary() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 2, 20, "1");
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
Expand Down
Loading