Skip to content

Commit

Permalink
Adding bytes transfered in each segrep events and additional metrics.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Jan 9, 2023
1 parent 2eed8a8 commit 88cf749
Show file tree
Hide file tree
Showing 13 changed files with 435 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.indices.replication.SegmentReplicationStatsState;
import org.opensearch.indices.replication.SegmentReplicationState;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class SegmentReplicationResponse extends BroadcastResponse {
private final Map<String, List<SegmentReplicationStatsState>> shardSegmentReplicationStatsStates;
private final Map<String, List<SegmentReplicationState>> shardSegmentReplicationStates;

public SegmentReplicationResponse(StreamInput in) throws IOException {
super(in);
shardSegmentReplicationStatsStates = in.readMapOfLists(StreamInput::readString, SegmentReplicationStatsState:: new);
shardSegmentReplicationStates = in.readMapOfLists(StreamInput::readString, SegmentReplicationState:: new);
}

/**
Expand All @@ -35,42 +35,42 @@ public SegmentReplicationResponse(StreamInput in) throws IOException {
* @param totalShards Total count of shards seen
* @param successfulShards Count of shards successfully processed
* @param failedShards Count of shards which failed to process
* @param shardSegmentReplicationStatsStates Map of indices to shard recovery information
* @param shardSegmentReplicationStates Map of indices to shard recovery information
* @param shardFailures List of failures processing shards
*/
public SegmentReplicationResponse(
int totalShards,
int successfulShards,
int failedShards,
Map<String, List<SegmentReplicationStatsState>> shardSegmentReplicationStatsStates,
Map<String, List<SegmentReplicationState>> shardSegmentReplicationStates,
List<DefaultShardOperationFailedException> shardFailures
) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.shardSegmentReplicationStatsStates = shardSegmentReplicationStatsStates;
this.shardSegmentReplicationStates = shardSegmentReplicationStates;
}

public boolean hasSegmentReplication() {
return shardSegmentReplicationStatsStates.size() > 0;
return shardSegmentReplicationStates.size() > 0;
}

public Map<String, List<SegmentReplicationStatsState>> shardSegmentReplicationStatsStates() {
return shardSegmentReplicationStatsStates;
public Map<String, List<SegmentReplicationState>> shardSegmentReplicationStates() {
return shardSegmentReplicationStates;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (hasSegmentReplication()) {
for (String index : shardSegmentReplicationStatsStates.keySet()) {
List<SegmentReplicationStatsState> segmentReplicationStatsStates = shardSegmentReplicationStatsStates.get(index);
if (segmentReplicationStatsStates == null || segmentReplicationStatsStates.size() == 0) {
for (String index : shardSegmentReplicationStates.keySet()) {
List<SegmentReplicationState> segmentReplicationStates = shardSegmentReplicationStates.get(index);
if (segmentReplicationStates == null || segmentReplicationStates.size() == 0) {
continue;
}
builder.startObject(index);
builder.startArray("shards");
for (SegmentReplicationStatsState segmentReplicationStatsState : segmentReplicationStatsStates) {
for (SegmentReplicationState segmentReplicationState : segmentReplicationStates) {
builder.startObject();
segmentReplicationStatsState.toXContent(builder, params);
segmentReplicationState.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
Expand All @@ -84,7 +84,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMapOfLists(shardSegmentReplicationStatsStates, StreamOutput::writeString, (o, v) -> v.writeTo(o));
out.writeMapOfLists(shardSegmentReplicationStates, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationStatsState;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -33,7 +33,7 @@
import java.util.List;
import java.util.Map;

public class TransportSegmentReplicationAction extends TransportBroadcastByNodeAction<SegmentReplicationRequest, SegmentReplicationResponse, SegmentReplicationStatsState> {
public class TransportSegmentReplicationAction extends TransportBroadcastByNodeAction<SegmentReplicationRequest, SegmentReplicationResponse, SegmentReplicationState> {

private final IndicesService indicesService;

Expand All @@ -58,8 +58,8 @@ public TransportSegmentReplicationAction(
}

@Override
protected SegmentReplicationStatsState readShardResult(StreamInput in) throws IOException {
return new SegmentReplicationStatsState(in);
protected SegmentReplicationState readShardResult(StreamInput in) throws IOException {
return new SegmentReplicationState(in);
}

@Override
Expand All @@ -68,23 +68,23 @@ protected SegmentReplicationResponse newResponse(
int totalShards,
int successfulShards,
int failedShards,
List<SegmentReplicationStatsState> responses,
List<SegmentReplicationState> responses,
List<DefaultShardOperationFailedException> shardFailures,
ClusterState clusterState
) {
Map<String, List<SegmentReplicationStatsState>> shardResponses = new HashMap<>();
for (SegmentReplicationStatsState segmentReplicationStatsState : responses) {
if (segmentReplicationStatsState == null) {
Map<String, List<SegmentReplicationState>> shardResponses = new HashMap<>();
for (SegmentReplicationState segmentReplicationState : responses) {
if (segmentReplicationState == null) {
continue;
}
String indexName = segmentReplicationStatsState.getShardRouting().getIndexName();
String indexName = segmentReplicationState.getShardRouting().getIndexName();
if (!shardResponses.containsKey(indexName)) {
shardResponses.put(indexName, new ArrayList<>());
}
if (request.activeOnly()) {
shardResponses.get(indexName).add(segmentReplicationStatsState);
shardResponses.get(indexName).add(segmentReplicationState);
} else {
shardResponses.get(indexName).add(segmentReplicationStatsState);
shardResponses.get(indexName).add(segmentReplicationState);
}
}
return new SegmentReplicationResponse(totalShards, successfulShards, failedShards, shardResponses, shardFailures);
Expand All @@ -96,10 +96,10 @@ protected SegmentReplicationRequest readRequestFrom(StreamInput in) throws IOExc
}

@Override
protected SegmentReplicationStatsState shardOperation(SegmentReplicationRequest request, ShardRouting shardRouting) {
protected SegmentReplicationState shardOperation(SegmentReplicationRequest request, ShardRouting shardRouting) {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
return indexShard.getSegmentReplicationStatsState();
return indexShard.getSegmentReplicationState();
}

@Override
Expand Down
20 changes: 10 additions & 10 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationStatsState;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -124,7 +124,7 @@ public final class IndexModule {

private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;

private static final IndexStorePlugin.SegmentReplicationStatsStateFactory DEFAULT_SEGMENT_REPLICATION_STATS_STATE_FACTORY = SegmentReplicationStatsState::new;
private static final IndexStorePlugin.SegmentReplicationStateFactory DEFAULT_SEGMENT_REPLICATION_STATE_FACTORY = SegmentReplicationState ::new;

public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
"index.store.type",
Expand Down Expand Up @@ -200,7 +200,7 @@ public final class IndexModule {
private final BooleanSupplier allowExpensiveQueries;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;

private final Map<String, IndexStorePlugin.SegmentReplicationStatsStateFactory> segmentReplicationStatsStateFactories;
private final Map<String, IndexStorePlugin.SegmentReplicationStateFactory> segmentReplicationStateFactories;

/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand All @@ -220,7 +220,7 @@ public IndexModule(
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
final Map<String, IndexStorePlugin.SegmentReplicationStatsStateFactory> segmentReplicationStatsStateFactories
final Map<String, IndexStorePlugin.SegmentReplicationStateFactory> segmentReplicationStateFactories
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
Expand All @@ -232,7 +232,7 @@ public IndexModule(
this.allowExpensiveQueries = allowExpensiveQueries;
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
this.segmentReplicationStatsStateFactories = segmentReplicationStatsStateFactories;
this.segmentReplicationStateFactories = segmentReplicationStateFactories;
}

/**
Expand Down Expand Up @@ -508,7 +508,7 @@ public IndexService newIndexService(
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
final IndexStorePlugin.SegmentReplicationStatsStateFactory segmentReplicationStatsStateFactory = getSegmentReplicationStateFactory(indexSettings, segmentReplicationStatsStateFactories);
final IndexStorePlugin.SegmentReplicationStateFactory segmentReplicationStateFactory = getSegmentReplicationStateFactory(indexSettings, segmentReplicationStateFactories);
QueryCache queryCache = null;
IndexAnalyzers indexAnalyzers = null;
boolean success = false;
Expand Down Expand Up @@ -557,7 +557,7 @@ public IndexService newIndexService(
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory,
segmentReplicationStatsStateFactory,
segmentReplicationStateFactory,
repositoriesServiceSupplier
);
success = true;
Expand Down Expand Up @@ -618,11 +618,11 @@ private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
return factory;
}

private static IndexStorePlugin.SegmentReplicationStatsStateFactory getSegmentReplicationStateFactory(
private static IndexStorePlugin.SegmentReplicationStateFactory getSegmentReplicationStateFactory(
final IndexSettings indexSettings,
final Map<String, IndexStorePlugin.SegmentReplicationStatsStateFactory> segmentReplicationStateFactories
final Map<String, IndexStorePlugin.SegmentReplicationStateFactory> segmentReplicationStateFactories
) {
return DEFAULT_SEGMENT_REPLICATION_STATS_STATE_FACTORY;
return DEFAULT_SEGMENT_REPLICATION_STATE_FACTORY;
}

/**
Expand Down
14 changes: 6 additions & 8 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationStatsState;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -144,7 +144,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;

private final IndexStorePlugin.SegmentReplicationStatsStateFactory segmentReplicationStatsStateFactory;
private final IndexStorePlugin.SegmentReplicationStateFactory segmentReplicationStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
private final MapperService mapperService;
Expand Down Expand Up @@ -212,7 +212,7 @@ public IndexService(
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
IndexStorePlugin.SegmentReplicationStatsStateFactory segmentReplicationStatsStateFactory,
IndexStorePlugin.SegmentReplicationStateFactory segmentReplicationStateFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
super(indexSettings);
Expand Down Expand Up @@ -274,7 +274,7 @@ public IndexService(
this.directoryFactory = directoryFactory;
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.segmentReplicationStatsStateFactory = segmentReplicationStatsStateFactory;
this.segmentReplicationStateFactory = segmentReplicationStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
Expand Down Expand Up @@ -546,7 +546,6 @@ public synchronized IndexShard createShard(
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
eventListener.onStoreCreated(shardId);
SegmentReplicationStatsState segmentReplicationStatsState = createSegmentReplicationStatsState(routing);
indexShard = new IndexShard(
routing,
this.indexSettings,
Expand All @@ -570,7 +569,6 @@ public synchronized IndexShard createShard(
circuitBreakerService,
translogFactory,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
segmentReplicationStatsState,
remoteStore
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
Expand Down Expand Up @@ -669,8 +667,8 @@ public RecoveryState createRecoveryState(ShardRouting shardRouting, DiscoveryNod
return recoveryStateFactory.newRecoveryState(shardRouting, targetNode, sourceNode);
}

public SegmentReplicationStatsState createSegmentReplicationStatsState(ShardRouting shardRouting){
return segmentReplicationStatsStateFactory.newSegmentReplicationStatsState(shardRouting);
public SegmentReplicationState createSegmentReplicationState(ShardRouting shardRouting, DiscoveryNode node){
return segmentReplicationStateFactory.newSegmentReplicationState(shardRouting, node);
}

@Override
Expand Down
14 changes: 8 additions & 6 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.SegmentReplicationStatsState;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -279,7 +279,7 @@ Runnable getGlobalCheckpointSyncer() {
private volatile RecoveryState recoveryState;

@Nullable
private volatile SegmentReplicationStatsState segmentReplicationStatsState;
private volatile SegmentReplicationState segmentReplicationState;

private final RecoveryStats recoveryStats = new RecoveryStats();
private final MeanMetric refreshMetric = new MeanMetric();
Expand Down Expand Up @@ -348,7 +348,6 @@ public IndexShard(
final CircuitBreakerService circuitBreakerService,
final TranslogFactory translogFactory,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final SegmentReplicationStatsState segmentReplicationStatsState,
@Nullable final Store remoteStore
) throws IOException {
super(shardRouting.shardId(), indexSettings);
Expand Down Expand Up @@ -433,7 +432,6 @@ public boolean shouldCache(Query query) {
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointPublisher = checkpointPublisher;
this.segmentReplicationStatsState = segmentReplicationStatsState;
this.remoteStore = remoteStore;
this.translogFactory = translogFactory;
}
Expand Down Expand Up @@ -1483,8 +1481,12 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
return true;
}

public SegmentReplicationStatsState getSegmentReplicationStatsState(){
return this.segmentReplicationStatsState;
public SegmentReplicationState getSegmentReplicationState(){
return this.segmentReplicationState;
}

public void setSegmentReplicationState(SegmentReplicationState segmentReplicationState){
this.segmentReplicationState = segmentReplicationState;
}

/**
Expand Down
Loading

0 comments on commit 88cf749

Please sign in to comment.