Skip to content

Commit

Permalink
Merge branch '2.x' into backport/backport-14454-to-2.x
Browse files Browse the repository at this point in the history
Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S authored Sep 3, 2024
2 parents 092397a + 834b7c1 commit 7694036
Show file tree
Hide file tree
Showing 94 changed files with 6,375 additions and 1,082 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.util.CollectionUtil;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.Context;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.XContentParser;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class GetIndexResponse {
private Map<String, Settings> settings;
private Map<String, Settings> defaultSettings;
private Map<String, String> dataStreams;
private Map<String, Context> contexts;
private String[] indices;

GetIndexResponse(
Expand All @@ -69,7 +71,8 @@ public class GetIndexResponse {
Map<String, List<AliasMetadata>> aliases,
Map<String, Settings> settings,
Map<String, Settings> defaultSettings,
Map<String, String> dataStreams
Map<String, String> dataStreams,
Map<String, Context> contexts
) {
this.indices = indices;
// to have deterministic order
Expand All @@ -89,6 +92,9 @@ public class GetIndexResponse {
if (dataStreams != null) {
this.dataStreams = dataStreams;
}
if (contexts != null) {
this.contexts = contexts;
}
}

public String[] getIndices() {
Expand Down Expand Up @@ -123,6 +129,10 @@ public Map<String, String> getDataStreams() {
return dataStreams;
}

public Map<String, Context> contexts() {
return contexts;
}

/**
* Returns the string value for the specified index and setting. If the includeDefaults flag was not set or set to
* false on the {@link GetIndexRequest}, this method will only return a value where the setting was explicitly set
Expand Down Expand Up @@ -167,6 +177,7 @@ private static IndexEntry parseIndexEntry(XContentParser parser) throws IOExcept
Settings indexSettings = null;
Settings indexDefaultSettings = null;
String dataStream = null;
Context context = null;
// We start at START_OBJECT since fromXContent ensures that
while (parser.nextToken() != Token.END_OBJECT) {
ensureExpectedToken(Token.FIELD_NAME, parser.currentToken(), parser);
Expand All @@ -185,6 +196,9 @@ private static IndexEntry parseIndexEntry(XContentParser parser) throws IOExcept
case "defaults":
indexDefaultSettings = Settings.fromXContent(parser);
break;
case "context":
context = Context.fromXContent(parser);
break;
default:
parser.skipChildren();
}
Expand All @@ -197,7 +211,7 @@ private static IndexEntry parseIndexEntry(XContentParser parser) throws IOExcept
parser.skipChildren();
}
}
return new IndexEntry(indexAliases, indexMappings, indexSettings, indexDefaultSettings, dataStream);
return new IndexEntry(indexAliases, indexMappings, indexSettings, indexDefaultSettings, dataStream, context);
}

// This is just an internal container to make stuff easier for returning
Expand All @@ -207,19 +221,22 @@ private static class IndexEntry {
Settings indexSettings = Settings.EMPTY;
Settings indexDefaultSettings = Settings.EMPTY;
String dataStream;
Context context;

IndexEntry(
List<AliasMetadata> indexAliases,
MappingMetadata indexMappings,
Settings indexSettings,
Settings indexDefaultSettings,
String dataStream
String dataStream,
Context context
) {
if (indexAliases != null) this.indexAliases = indexAliases;
if (indexMappings != null) this.indexMappings = indexMappings;
if (indexSettings != null) this.indexSettings = indexSettings;
if (indexDefaultSettings != null) this.indexDefaultSettings = indexDefaultSettings;
if (dataStream != null) this.dataStream = dataStream;
if (context != null) this.context = context;
}
}

Expand All @@ -229,6 +246,7 @@ public static GetIndexResponse fromXContent(XContentParser parser) throws IOExce
Map<String, Settings> settings = new HashMap<>();
Map<String, Settings> defaultSettings = new HashMap<>();
Map<String, String> dataStreams = new HashMap<>();
Map<String, Context> contexts = new HashMap<>();
List<String> indices = new ArrayList<>();

if (parser.currentToken() == null) {
Expand All @@ -254,12 +272,15 @@ public static GetIndexResponse fromXContent(XContentParser parser) throws IOExce
if (indexEntry.dataStream != null) {
dataStreams.put(indexName, indexEntry.dataStream);
}
if (indexEntry.context != null) {
contexts.put(indexName, indexEntry.context);
}
} else if (parser.currentToken() == Token.START_ARRAY) {
parser.skipChildren();
} else {
parser.nextToken();
}
}
return new GetIndexResponse(indices.toArray(new String[0]), mappings, aliases, settings, defaultSettings, dataStreams);
return new GetIndexResponse(indices.toArray(new String[0]), mappings, aliases, settings, defaultSettings, dataStreams, contexts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.client.AbstractResponseTestCase;
import org.opensearch.client.GetAliasesResponseTests;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.Context;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -66,6 +67,7 @@ protected org.opensearch.action.admin.indices.get.GetIndexResponse createServerT
final Map<String, Settings> settings = new HashMap<>();
final Map<String, Settings> defaultSettings = new HashMap<>();
final Map<String, String> dataStreams = new HashMap<>();
final Map<String, Context> contexts = new HashMap<>();
IndexScopedSettings indexScopedSettings = IndexScopedSettings.DEFAULT_SCOPED_SETTINGS;
boolean includeDefaults = randomBoolean();
for (String index : indices) {
Expand All @@ -90,14 +92,19 @@ protected org.opensearch.action.admin.indices.get.GetIndexResponse createServerT
if (randomBoolean()) {
dataStreams.put(index, randomAlphaOfLength(5).toLowerCase(Locale.ROOT));
}

if (randomBoolean()) {
contexts.put(index, new Context(randomAlphaOfLength(5).toLowerCase(Locale.ROOT)));
}
}
return new org.opensearch.action.admin.indices.get.GetIndexResponse(
indices,
mappings,
aliases,
settings,
defaultSettings,
dataStreams
dataStreams,
null
);
}

Expand All @@ -116,6 +123,7 @@ protected void assertInstances(
assertEquals(serverTestInstance.getSettings(), clientInstance.getSettings());
assertEquals(serverTestInstance.defaultSettings(), clientInstance.getDefaultSettings());
assertEquals(serverTestInstance.getAliases(), clientInstance.getAliases());
assertEquals(serverTestInstance.contexts(), clientInstance.contexts());
}

private static MappingMetadata createMappingsForIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,23 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.applicationtemplates.ClusterStateSystemTemplateLoader;
import org.opensearch.cluster.applicationtemplates.SystemTemplate;
import org.opensearch.cluster.applicationtemplates.SystemTemplateMetadata;
import org.opensearch.cluster.applicationtemplates.TemplateRepositoryMetadata;
import org.opensearch.cluster.metadata.Context;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.MapperParsingException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.RangeQueryBuilder;
Expand All @@ -58,7 +66,10 @@
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -429,4 +440,53 @@ public void testCreateIndexWithNullReplicaCountPickUpClusterReplica() {
);
}
}

public void testCreateIndexWithContextSettingsAndTemplate() throws Exception {
int numReplicas = 1;
String indexName = "test-idx-1";
Settings settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), (String) null)
.build();
Context context = new Context("test");

String templateContent = "{\n"
+ " \"template\": {\n"
+ " \"settings\": {\n"
+ " \"merge.policy\": \"log_byte_size\"\n"
+ " }\n"
+ " },\n"
+ " \"_meta\": {\n"
+ " \"_type\": \"@abc_template\",\n"
+ " \"_version\": 1\n"
+ " },\n"
+ " \"version\": 1\n"
+ "}\n";

ClusterStateSystemTemplateLoader loader = new ClusterStateSystemTemplateLoader(
internalCluster().clusterManagerClient(),
() -> internalCluster().getInstance(ClusterService.class).state()
);
loader.loadTemplate(
new SystemTemplate(
BytesReference.fromByteBuffer(ByteBuffer.wrap(templateContent.getBytes(StandardCharsets.UTF_8))),
SystemTemplateMetadata.fromComponentTemplateInfo("test", 1L),
new TemplateRepositoryMetadata(UUID.randomUUID().toString(), 1L)
)
);

assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(settings).setContext(context).get());

IndicesService indicesService = internalCluster().getInstance(IndicesService.class, internalCluster().getClusterManagerName());

for (IndexService indexService : indicesService) {
assertEquals(indexName, indexService.index().getName());
assertEquals(
numReplicas,
(int) indexService.getIndexSettings().getSettings().getAsInt(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, null)
);
assertEquals(context, indexService.getMetadata().context());
assertEquals("log_byte_size", indexService.getMetadata().getSettings().get(IndexSettings.INDEX_MERGE_POLICY.getKey()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -109,13 +110,16 @@ public void cleanUp() throws Exception {
assertAcked(
client().admin().indices().prepareDelete("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN).get()
);
assertBusy(() -> {
try {
assertEquals(0, getFileCount(translogRepoPath));
} catch (IOException e) {
fail();
}
}, 30, TimeUnit.SECONDS);
// With pinned timestamp, we can have tlog files even after deletion.
if (RemoteStoreSettings.isPinnedTimestampsEnabled() == false) {
assertBusy(() -> {
try {
assertEquals(0, getFileCount(translogRepoPath));
} catch (IOException e) {
fail();
}
}, 30, TimeUnit.SECONDS);
}
super.teardown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -40,6 +41,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -253,11 +255,13 @@ private void verifyIndexRoutingFilesDeletion(
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(discoveryStats.getClusterStateStats());
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
Expand Down Expand Up @@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
}

public void testRemotePublicationDownloadStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);

NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);

}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);

assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0);
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Expand Down
Loading

0 comments on commit 7694036

Please sign in to comment.