Skip to content

Commit

Permalink
Expose translog stats in ReadOnlyEngine (elastic#43752)
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Jul 1, 2019
1 parent 40d43e3 commit 50fae17
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,46 @@ setup:
indices.stats:
metric: [ translog ]
- gte: { indices.test.primaries.translog.earliest_last_modified_age: 0 }

---
"Translog stats on closed indices":
- skip:
version: " - 7.9.99"
reason: "closed indices have translog stats starting version 8.0.0"

- do:
index:
index: test
id: 1
body: { "foo": "bar" }

- do:
index:
index: test
id: 2
body: { "foo": "bar" }

- do:
index:
index: test
id: 3
body: { "foo": "bar" }

- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 3 }

- do:
indices.close:
index: test
- is_true: acknowledged

- do:
indices.stats:
metric: [ translog ]
expand_wildcards: all
forbid_closed_indices: false
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogStats;

import java.io.Closeable;
Expand Down Expand Up @@ -108,7 +110,6 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
// yet this makes sure nobody else does. including some testing tools that try to be messy
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
Expand All @@ -119,6 +120,8 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
reader = wrapReader(reader, readerWrapperFunction);
searcherManager = new SearcherManager(reader, searcherFactory);
this.docsStats = docsStats(lastCommittedSegmentInfos);
assert translogStats != null || obtainLock : "mutiple translogs instances should not be opened at the same time";
this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos);
this.indexWriterLock = indexWriterLock;
success = true;
} finally {
Expand Down Expand Up @@ -216,6 +219,26 @@ private static SeqNoStats buildSeqNoStats(EngineConfig config, SegmentInfos info
return new SeqNoStats(maxSeqNo, localCheckpoint, config.getGlobalCheckpointSupplier().getAsLong());
}

private static TranslogStats translogStats(final EngineConfig config, final SegmentInfos infos) throws IOException {
final String translogUuid = infos.getUserData().get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
config.getIndexSettings().getTranslogRetentionAge().getMillis()
);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);

try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(), seqNo -> {})
) {
return translog.stats();
}
}

@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogStats;

import java.io.IOException;
import java.util.List;
Expand All @@ -37,6 +38,7 @@

import static org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader.getElasticsearchDirectoryReader;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;

public class ReadOnlyEngineTests extends EngineTestCase {
Expand Down Expand Up @@ -183,7 +185,7 @@ public void testReadOnly() throws IOException {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
Class<? extends Throwable> expectedException = LuceneTestCase.TEST_ASSERTS_ENABLED ? AssertionError.class :
UnsupportedOperationException.class;
expectThrows(expectedException, () -> readOnlyEngine.index(null));
Expand All @@ -204,7 +206,7 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
globalCheckpoint.set(randomNonNegativeLong());
try {
readOnlyEngine.verifyEngineBeforeIndexClosing();
Expand Down Expand Up @@ -242,4 +244,46 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
}
}
}

public void testTranslogStats() throws IOException {
IOUtils.close(engine, store);
try (Store store = createStore()) {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);

final int numDocs = frequently() ? scaledRandomIntBetween(10, 200) : 0;
int uncommittedDocs = 0;

try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
if (rarely()) {
engine.flush();
uncommittedDocs = 0;
} else {
uncommittedDocs += 1;
}
globalCheckpoint.set(i);
}

assertThat(engine.getTranslogStats().estimatedNumberOfOperations(), equalTo(numDocs));
assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(uncommittedDocs));
assertThat(engine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
assertThat(engine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
assertThat(engine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));

engine.flush(true, true);
}

try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity())) {
assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(numDocs));
assertThat(readOnlyEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
assertThat(readOnlyEngine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
assertThat(readOnlyEngine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4083,7 +4083,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception {
ShardRouting readonlyShardRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true,
ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.INSTANCE);
final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting,
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, false, Function.identity()) {
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity()) {
@Override
protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
// just like a following shard, we need to skip this check for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -34,6 +36,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
Expand All @@ -54,6 +57,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class OpenCloseIndexIT extends ESIntegTestCase {
public void testSimpleCloseOpen() {
Expand Down Expand Up @@ -346,4 +350,38 @@ public void testOpenCloseIndexWithBlocks() {
}
}
}

public void testTranslogStats() {
final String indexName = "test";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());

final int nbDocs = randomIntBetween(0, 50);
int uncommittedOps = 0;
for (long i = 0; i < nbDocs; i++) {
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
assertThat(indexResponse.status(), is(RestStatus.CREATED));

if (rarely()) {
client().admin().indices().prepareFlush(indexName).get();
uncommittedOps = 0;
} else {
uncommittedOps += 1;
}
}

IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(uncommittedOps));

assertAcked(client().admin().indices().prepareClose("test"));

IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN;
stats = client().admin().indices().prepareStats(indexName).setIndicesOptions(indicesOptions).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -409,4 +409,39 @@ public void testRecoveryState() throws ExecutionException, InterruptedException
assertThat(recoveryState.getTranslog().recoveredPercent(), equalTo(100.0f));
}
}

public void testTranslogStats() {
final String indexName = "test";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());

final int nbDocs = randomIntBetween(0, 50);
int uncommittedOps = 0;
for (long i = 0; i < nbDocs; i++) {
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
assertThat(indexResponse.status(), is(RestStatus.CREATED));

if (rarely()) {
client().admin().indices().prepareFlush(indexName).get();
uncommittedOps = 0;
} else {
uncommittedOps += 1;
}
}

IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(uncommittedOps));

assertAcked(client().execute(FreezeIndexAction.INSTANCE, new TransportFreezeIndexAction.FreezeRequest(indexName)).actionGet());
assertIndexFrozen(indexName);

IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN;
stats = client().admin().indices().prepareStats(indexName).setIndicesOptions(indicesOptions).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(0));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
---
setup:
- do:
indices.create:
index: test
- do:
cluster.health:
wait_for_no_initializing_shards: true

---
"Translog stats on frozen indices":
- skip:
version: " - 7.9.99"
reason: "frozen indices have translog stats starting version 8.0.0"

- do:
index:
index: test
id: 1
body: { "foo": "bar" }

- do:
index:
index: test
id: 2
body: { "foo": "bar" }

- do:
index:
index: test
id: 3
body: { "foo": "bar" }

- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 3 }

# freeze index
- do:
indices.freeze:
index: test
- is_true: acknowledged

- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

# unfreeze index
- do:
indices.freeze:
index: test
- is_true: acknowledged

- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

0 comments on commit 50fae17

Please sign in to comment.