Skip to content

Commit

Permalink
Expose retention leases in shard stats (#37991)
Browse files Browse the repository at this point in the history
This commit exposes retention leases via shard-level stats.
  • Loading branch information
jasontedor committed Jan 30, 2019
1 parent b13ae57 commit 24c650a
Show file tree
Hide file tree
Showing 15 changed files with 401 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.RetentionLeaseStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -106,21 +107,25 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
// only report on fully started shards
CommitStats commitStats;
SeqNoStats seqNoStats;
RetentionLeaseStats retentionLeaseStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
retentionLeaseStats = indexShard.getRetentionLeaseStats();
} catch (final AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
retentionLeaseStats = null;
}
shardsStats.add(
new ShardStats(
indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
commitStats,
seqNoStats));
new ShardStats(
indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
commitStats,
seqNoStats,
retentionLeaseStats));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,58 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.RetentionLeaseStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.ShardPath;

import java.io.IOException;

public class ShardStats implements Streamable, Writeable, ToXContentFragment {

private ShardRouting shardRouting;
private CommonStats commonStats;
@Nullable
private CommitStats commitStats;
@Nullable
private SeqNoStats seqNoStats;

@Nullable
private RetentionLeaseStats retentionLeaseStats;

/**
* Gets the current retention lease stats.
*
* @return the current retention lease stats
*/
public RetentionLeaseStats getRetentionLeaseStats() {
return retentionLeaseStats;
}

private String dataPath;
private String statePath;
private boolean isCustomDataPath;

ShardStats() {
}

public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats, SeqNoStats seqNoStats) {
public ShardStats(
final ShardRouting routing,
final ShardPath shardPath,
final CommonStats commonStats,
final CommitStats commitStats,
final SeqNoStats seqNoStats,
final RetentionLeaseStats retentionLeaseStats) {
this.shardRouting = routing;
this.dataPath = shardPath.getRootDataPath().toString();
this.statePath = shardPath.getRootStatePath().toString();
this.isCustomDataPath = shardPath.isCustomDataPath();
this.commitStats = commitStats;
this.commonStats = commonStats;
this.seqNoStats = seqNoStats;
this.retentionLeaseStats = retentionLeaseStats;
}

/**
Expand Down Expand Up @@ -109,6 +130,9 @@ public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
}
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
retentionLeaseStats = in.readOptionalWriteable(RetentionLeaseStats::new);
}
}

@Override
Expand All @@ -122,6 +146,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeOptionalWriteable(seqNoStats);
}
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeOptionalWriteable(retentionLeaseStats);
}
}

@Override
Expand All @@ -140,6 +167,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (seqNoStats != null) {
seqNoStats.toXContent(builder, params);
}
if (retentionLeaseStats != null) {
retentionLeaseStats.toXContent(builder, params);
}
builder.startObject(Fields.SHARD_PATH);
builder.field(Fields.STATE_PATH, statePath);
builder.field(Fields.DATA_PATH, dataPath);
Expand All @@ -159,4 +189,5 @@ static final class Fields {
static final String NODE = "node";
static final String RELOCATING_NODE = "relocating_node";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.RetentionLeaseStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardNotFoundException;
Expand Down Expand Up @@ -166,18 +167,25 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh

CommitStats commitStats;
SeqNoStats seqNoStats;
RetentionLeaseStats retentionLeaseStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
retentionLeaseStats = indexShard.getRetentionLeaseStats();
} catch (final AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
retentionLeaseStats = null;
}

return new ShardStats(
indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), commitStats, seqNoStats);
indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags),
commitStats,
seqNoStats,
retentionLeaseStats);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -242,4 +244,14 @@ public String toString() {
'}';
}

/**
* A utility method to convert a collection of retention leases to a map from retention lease ID to retention lease.
*
* @param leases the leases
* @return the map from retention lease ID to retention lease
*/
static Map<String, RetentionLease> toMap(final Collection<RetentionLease> leases) {
return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.seqno;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collection;
import java.util.Objects;

/**
* Represents retention lease stats.
*/
public final class RetentionLeaseStats implements ToXContentFragment, Writeable {

private final Collection<RetentionLease> leases;

/**
* The underlying retention leases backing this stats object.
*
* @return the leases
*/
public Collection<RetentionLease> leases() {
return leases;
}

/**
* Constructs a new retention lease stats object from the specified leases.
*
* @param leases the leases
*/
public RetentionLeaseStats(final Collection<RetentionLease> leases) {
this.leases = Objects.requireNonNull(leases);
}

/**
* Constructs a new retention lease stats object from a stream. The retention lease stats should have been written via
* {@link #writeTo(StreamOutput)}.
*
* @param in the stream to construct the retention lease stats from
* @throws IOException if an I/O exception occurs reading from the stream
*/
public RetentionLeaseStats(final StreamInput in) throws IOException {
leases = in.readList(RetentionLease::new);
}

/**
* Writes a retention lease stats object to a stream in a manner suitable for later reconstruction via
* {@link #RetentionLeaseStats(StreamInput)} (StreamInput)}.
*
* @param out the stream to write the retention lease stats to
* @throws IOException if an I/O exception occurs writing to the stream
*/
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeCollection(leases);
}

/**
* Converts the retention lease stats to {@link org.elasticsearch.common.xcontent.XContent} using the specified builder and pararms.
*
* @param builder the builder
* @param params the params
* @return the builder that these retention leases were converted to {@link org.elasticsearch.common.xcontent.XContent} into
* @throws IOException if an I/O exception occurs writing to the builder
*/
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject("retention_leases");
{
builder.startArray("leases");
{
for (final RetentionLease retentionLease : leases) {
builder.startObject();
{
builder.field("id", retentionLease.id());
builder.field("retaining_seq_no", retentionLease.retainingSequenceNumber());
builder.field("timestamp", retentionLease.timestamp());
builder.field("source", retentionLease.source());
}
builder.endObject();
}
}
builder.endArray();
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final RetentionLeaseStats that = (RetentionLeaseStats) o;
return Objects.equals(leases, that.leases);
}

@Override
public int hashCode() {
return Objects.hash(leases);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeaseStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
Expand Down Expand Up @@ -1930,6 +1931,11 @@ public Collection<RetentionLease> getRetentionLeases() {
return replicationTracker.getRetentionLeases();
}

public RetentionLeaseStats getRetentionLeaseStats() {
verifyNotClosed();
return new RetentionLeaseStats(getRetentionLeases());
}

/**
* Adds a new retention lease.
*
Expand Down
23 changes: 15 additions & 8 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.seqno.RetentionLeaseStats;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
Expand Down Expand Up @@ -381,23 +382,29 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index

CommitStats commitStats;
SeqNoStats seqNoStats;
RetentionLeaseStats retentionLeaseStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
retentionLeaseStats = indexShard.getRetentionLeaseStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
retentionLeaseStats = null;
}

return new IndexShardStats(indexShard.shardId(),
new ShardStats[] {
new ShardStats(indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags),
commitStats,
seqNoStats)
});
return new IndexShardStats(
indexShard.shardId(),
new ShardStats[]{
new ShardStats(
indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags),
commitStats,
seqNoStats,
retentionLeaseStats)
});
}

/**
Expand Down
Loading

0 comments on commit 24c650a

Please sign in to comment.