Skip to content

Commit

Permalink
Added IT for disk tier stats, plus spotlessApply
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Alfonsi committed Oct 23, 2023
1 parent 7d03562 commit c5099f1
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 112 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

// This is a separate file from IndicesRequestCacheIT because we only want to run our test
// on a node with a maximum request cache size that we set.

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase {
public void testDiskTierStats() throws Exception {
int heapSizeBytes = 1800; // enough to fit 2 queries, as each is 687 B
int requestSize = 687; // each request is 687 B
String node = internalCluster().startNode(
Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get()
);
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

int numOnDisk = 5;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 0; i < numRequests; i++) {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
System.out.println("request number " + i);
}

System.out.println("Num requests = " + numRequests);

// the first request, for "hello0", should have been evicted to the disk tier
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello0")).get();
IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 1, numRequests, TierType.DISK, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.IndicesRequestIT;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.opensearch.action.search.SearchResponse;
Expand Down Expand Up @@ -638,30 +637,13 @@ public void testProfileDisableCache() throws Exception {

public void testCacheWithInvalidation() throws Exception {
Client client = client();
//int heapSizeBytes = 2000; // enough to fit 2 queries, as each is 687 B

Settings.Builder builder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(2000));
// Why is it appending "index." to the beginning of the key??

String heapSizeBytes = builder.get(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey());
System.out.println("Current cap = " + heapSizeBytes);
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

client.admin().setSettings(Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(2000)));

assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
builder
)
.get()
);
assertAcked(client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(builder).get());
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
Expand All @@ -675,8 +657,8 @@ public void testCacheWithInvalidation() throws Exception {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect hit as here as refresh didn't happen
assertCacheState(client, "index", 1, 1, TierType.ON_HEAP);
assertCacheState(client, "index", 0, 1, TierType.DISK);
assertCacheState(client, "index", 1, 1, TierType.ON_HEAP, false);
assertCacheState(client, "index", 0, 1, TierType.DISK, false);
assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP);

// Explicit refresh would invalidate cache
Expand All @@ -685,13 +667,21 @@ public void testCacheWithInvalidation() throws Exception {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh)
assertCacheState(client, "index", 1, 2, TierType.ON_HEAP);
assertCacheState(client, "index", 0, 2, TierType.DISK);
assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP); // Shouldn't it just be the most recent query, since the first one was invalidated? (prob invalidation isnt in yet)
assertCacheState(client, "index", 1, 2, TierType.ON_HEAP, false);
assertCacheState(client, "index", 0, 2, TierType.DISK, false);
assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP); // Shouldn't it just be the most recent query, since the first one was
// invalidated? (prob invalidation isnt in yet)
// yeah - evictions = 0, its not in yet
}

private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses, TierType tierType) {
protected static void assertCacheState(
Client client,
String index,
long expectedHits,
long expectedMisses,
TierType tierType,
boolean enforceZeroEvictions
) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats(index)
Expand All @@ -701,18 +691,32 @@ private static void assertCacheState(Client client, String index, long expectedH
.getRequestCache();
// Check the hit count and miss count together so if they are not
// correct we can see both values
System.out.println("mem size " + requestCacheStats.getMemorySize(tierType));
assertEquals(
Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(requestCacheStats.getHitCount(tierType), requestCacheStats.getMissCount(tierType), requestCacheStats.getEvictions(tierType))
);
ByteSizeValue memSize = requestCacheStats.getMemorySize(tierType);
if (memSize.getBytes() > 0) {
System.out.println("mem size " + memSize);
}
if (enforceZeroEvictions) {
assertEquals(
Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(
requestCacheStats.getHitCount(tierType),
requestCacheStats.getMissCount(tierType),
requestCacheStats.getEvictions(tierType)
)
);
} else {
assertEquals(
Arrays.asList(expectedHits, expectedMisses),
Arrays.asList(requestCacheStats.getHitCount(tierType), requestCacheStats.getMissCount(tierType))
);
}
}

private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
assertCacheState(client, index, expectedHits, expectedMisses, TierType.ON_HEAP);
protected static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
assertCacheState(client, index, expectedHits, expectedMisses, TierType.ON_HEAP, true);
}

private static void assertNumCacheEntries(Client client, String index, long expectedEntries, TierType tierType) {
protected static void assertNumCacheEntries(Client client, String index, long expectedEntries, TierType tierType) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.common.metrics;

import java.io.Serializable;
import java.util.concurrent.atomic.LongAdder;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
public class RequestCacheStats implements Writeable, ToXContentFragment {

private Map<String, StatsHolder> map;

public RequestCacheStats() {
this.map = new HashMap<>();
for (TierType tierType : TierType.values()) {
Expand All @@ -64,22 +65,15 @@ public RequestCacheStats() {
public RequestCacheStats(StreamInput in) throws IOException {
this();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.map = in.readMap(StreamInput::readString, StatsHolder::new); // does it know to use the right constructor? does it rly need to be registered?
this.map = in.readMap(StreamInput::readString, StatsHolder::new); // does it know to use the right constructor? does it rly need
// to be registered?
} else {
// objects from earlier versions only contain on-heap info, and do not have entries info
long memorySize = in.readVLong();
long evictions = in.readVLong();
long hitCount = in.readVLong();
long missCount = in.readVLong();
this.map.put(
TierType.ON_HEAP.getStringValue(),
new StatsHolder(
memorySize,
evictions,
hitCount,
missCount,
0
));
this.map.put(TierType.ON_HEAP.getStringValue(), new StatsHolder(memorySize, evictions, hitCount, missCount, 0));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ public ShardRequestCache() {
public RequestCacheStats stats() {
// TODO: Change RequestCacheStats to support disk tier stats.
// Changing this function to return a RequestCacheStats with stats from all tiers.
//return stats(TierType.ON_HEAP);
// return stats(TierType.ON_HEAP);
return new RequestCacheStats(statsHolder);
}


public void onHit(TierType tierType) {
statsHolder.get(tierType).hitCount.inc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class StatsHolder implements Serializable, Writeable, ToXContentFragment
final CounterMetric missCount;
final CounterMetric entries;


public StatsHolder() {
this.totalMetric = new CounterMetric();
this.evictionsMetric = new CounterMetric();
Expand All @@ -49,7 +48,7 @@ public StatsHolder(long memorySize, long evictions, long hitCount, long missCoun
this.entries.inc(entries);
}

public StatsHolder(StreamInput in) throws IOException {
public StatsHolder(StreamInput in) throws IOException {
// Read and write the values of the counter metrics. They should always be positive
// This object is new, so we shouldn't need version checks for different behavior
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
Expand Down Expand Up @@ -97,7 +96,11 @@ public long getEntries() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.humanReadableField(RequestCacheStats.Fields.MEMORY_SIZE_IN_BYTES, RequestCacheStats.Fields.MEMORY_SIZE, new ByteSizeValue(getMemorySize()));
builder.humanReadableField(
RequestCacheStats.Fields.MEMORY_SIZE_IN_BYTES,
RequestCacheStats.Fields.MEMORY_SIZE,
new ByteSizeValue(getMemorySize())
);
builder.field(RequestCacheStats.Fields.EVICTIONS, getEvictions());
builder.field(RequestCacheStats.Fields.HIT_COUNT, getHitCount());
builder.field(RequestCacheStats.Fields.MISS_COUNT, getMissCount());
Expand Down
2 changes: 0 additions & 2 deletions server/src/main/java/org/opensearch/indices/CachingTier.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

import org.opensearch.common.cache.RemovalListener;

import java.io.IOException;

/**
* asdsadssa
* @param <K>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
public class DummySerializableKey implements Serializable {
private Integer i;
private String s;

public DummySerializableKey(Integer i, String s) {
this.i = i;
this.s = s;
Expand All @@ -22,9 +23,11 @@ public DummySerializableKey(Integer i, String s) {
public int getI() {
return i;
}

public String getS() {
return s;
}

@Override
public boolean equals(Object o) {
if (o == this) {
Expand All @@ -36,6 +39,7 @@ public boolean equals(Object o) {
DummySerializableKey other = (DummySerializableKey) o;
return Objects.equals(this.i, other.i) && this.s.equals(other.s);
}

@Override
public final int hashCode() {
int result = 11;
Expand Down
Loading

0 comments on commit c5099f1

Please sign in to comment.