Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose translog stats in ReadOnlyEngine #43752

Merged
merged 4 commits into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,51 @@ setup:
indices.stats:
metric: [ translog ]
- gte: { indices.test.primaries.translog.earliest_last_modified_age: 0 }

---
"Translog stats on closed indices":
- skip:
version: " - 7.2.99"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should skip this test until 7.9.99 then adjust to this value after backporting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird that this did not fail the tests? Perhaps BWC is disabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pfff... I should have seen this. And yes, bwc tests were disabled at that time. I merged master and bwc are enabled again.

reason: "closed indices have translog stats starting version 7.3.0"

- do:
index:
index: test
id: 1
body: { "foo": "bar" }

- do:
index:
index: test
id: 2
body: { "foo": "bar" }

- do:
index:
index: test
id: 3
body: { "foo": "bar" }

- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 3 }

- do:
indices.close:
index: test
- is_true: acknowledged

- do:
cluster.health:
expand_wildcards: all
wait_for_no_initializing_shards: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only needed for the backport.
Alternatively you could use wait_for_active_shards = 1 on the indices.close action

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right


- do:
indices.stats:
metric: [ translog ]
expand_wildcards: all
forbid_closed_indices: false
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogStats;

import java.io.Closeable;
Expand Down Expand Up @@ -80,6 +82,7 @@ public class ReadOnlyEngine extends Engine {
private final DocsStats docsStats;
private final RamAccountingSearcherFactory searcherFactory;


tlrx marked this conversation as resolved.
Show resolved Hide resolved
/**
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
* read-write engine. It allows to optionally obtain the writer locks for the shard which would time-out if another
Expand Down Expand Up @@ -108,7 +111,6 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
// yet this makes sure nobody else does. including some testing tools that try to be messy
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
Expand All @@ -119,6 +121,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
reader = wrapReader(reader, readerWrapperFunction);
searcherManager = new SearcherManager(reader, searcherFactory);
this.docsStats = docsStats(lastCommittedSegmentInfos);
this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert that if translogStats is null then obtainLock is true? This ensures that we don't open two translog instances at the same time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ especially as opening the translog here writes a new checkpoint file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good suggestion, thanks

this.indexWriterLock = indexWriterLock;
success = true;
} finally {
Expand Down Expand Up @@ -216,6 +219,26 @@ private static SeqNoStats buildSeqNoStats(EngineConfig config, SegmentInfos info
return new SeqNoStats(maxSeqNo, localCheckpoint, config.getGlobalCheckpointSupplier().getAsLong());
}

private static TranslogStats translogStats(final EngineConfig config, final SegmentInfos infos) throws IOException {
final String translogUuid = infos.getUserData().get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
config.getIndexSettings().getTranslogRetentionAge().getMillis()
);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);

try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(), seqNo -> {})
) {
return translog.stats();
}
}

@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogStats;

import java.io.IOException;
import java.util.List;
Expand All @@ -37,6 +38,7 @@

import static org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader.getElasticsearchDirectoryReader;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;

public class ReadOnlyEngineTests extends EngineTestCase {
Expand Down Expand Up @@ -183,7 +185,7 @@ public void testReadOnly() throws IOException {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
Class<? extends Throwable> expectedException = LuceneTestCase.TEST_ASSERTS_ENABLED ? AssertionError.class :
UnsupportedOperationException.class;
expectThrows(expectedException, () -> readOnlyEngine.index(null));
Expand All @@ -204,7 +206,7 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
globalCheckpoint.set(randomNonNegativeLong());
try {
readOnlyEngine.verifyEngineBeforeIndexClosing();
Expand Down Expand Up @@ -242,4 +244,46 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
}
}
}

public void testTranslogStats() throws IOException {
IOUtils.close(engine, store);
try (Store store = createStore()) {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);

final int numDocs = frequently() ? scaledRandomIntBetween(10, 200) : 0;
int uncommittedDocs = 0;

try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
if (rarely()) {
engine.flush();
uncommittedDocs = 0;
} else {
uncommittedDocs += 1;
}
globalCheckpoint.set(i);
}

assertThat(engine.getTranslogStats().estimatedNumberOfOperations(), equalTo(numDocs));
assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(uncommittedDocs));
assertThat(engine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
assertThat(engine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
assertThat(engine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));

engine.flush(true, true);
}

try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity())) {
assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(numDocs));
assertThat(readOnlyEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
assertThat(readOnlyEngine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
assertThat(readOnlyEngine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -34,6 +36,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
Expand All @@ -54,6 +57,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class OpenCloseIndexIT extends ESIntegTestCase {
public void testSimpleCloseOpen() {
Expand Down Expand Up @@ -346,4 +350,38 @@ public void testOpenCloseIndexWithBlocks() {
}
}
}

public void testTranslogStats() {
final String indexName = "test";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());

final int nbDocs = randomIntBetween(0, 50);
int uncommittedOps = 0;
for (long i = 0; i < nbDocs; i++) {
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
assertThat(indexResponse.status(), is(RestStatus.CREATED));

if (rarely()) {
client().admin().indices().prepareFlush(indexName).get();
uncommittedOps = 0;
} else {
uncommittedOps += 1;
}
}

IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(uncommittedOps));

assertAcked(client().admin().indices().prepareClose("test"));

IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN;
stats = client().admin().indices().prepareStats(indexName).setIndicesOptions(indicesOptions).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,4 +401,39 @@ public void testRecoveryState() {
assertThat(recoveryState.getTranslog().recoveredPercent(), equalTo(100.0f));
}
}

public void testTranslogStats() {
final String indexName = "test";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());

final int nbDocs = randomIntBetween(0, 50);
int uncommittedOps = 0;
for (long i = 0; i < nbDocs; i++) {
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
assertThat(indexResponse.status(), is(RestStatus.CREATED));

if (rarely()) {
client().admin().indices().prepareFlush(indexName).get();
uncommittedOps = 0;
} else {
uncommittedOps += 1;
}
}

IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(uncommittedOps));

assertAcked(client().execute(FreezeIndexAction.INSTANCE, new TransportFreezeIndexAction.FreezeRequest(indexName)).actionGet());
assertIndexFrozen(indexName);

IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN;
stats = client().admin().indices().prepareStats(indexName).setIndicesOptions(indicesOptions).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(0));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
---
setup:
- do:
indices.create:
index: test
- do:
cluster.health:
wait_for_no_initializing_shards: true

---
"Translog stats on frozen indices":
- skip:
version: " - 7.2.99"
reason: "frozen indices have translog stats starting version 7.3.0"

- do:
index:
index: test
id: 1
body: { "foo": "bar" }

- do:
index:
index: test
id: 2
body: { "foo": "bar" }

- do:
index:
index: test
id: 3
body: { "foo": "bar" }

- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 3 }

# freeze index
- do:
indices.freeze:
index: test
- is_true: acknowledged

- do:
cluster.health:
wait_for_no_initializing_shards: true

- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

# unfreeze index
- do:
indices.freeze:
index: test
- is_true: acknowledged

- do:
cluster.health:
wait_for_no_initializing_shards: true

- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }