Skip to content

Commit

Permalink
- Warning when synonyms are over the limit on read
Browse files Browse the repository at this point in the history
- Allow updating an already existing rule at max capacity
- Remove index setting for now
  • Loading branch information
carlosdelest committed Jun 20, 2024
1 parent 20e8550 commit 8efeff1
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

package org.elasticsearch.synonyms;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
Expand All @@ -20,7 +22,12 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.action.synonyms.SynonymsTestUtils.randomSynonymRule;
import static org.elasticsearch.action.synonyms.SynonymsTestUtils.randomSynonymsSet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class SynonymsManagementAPIServiceIT extends ESIntegTestCase {

Expand Down Expand Up @@ -185,10 +192,45 @@ public void testUpdateRuleWithMaxSynonyms() throws InterruptedException {
@Override
public void onResponse(SynonymsManagementAPIService.SynonymsReloadResult synonymsReloadResult) {
// Updating a rule fails
synonymsManagementAPIService.putSynonymRule(synonymSetId, synonymsSet[0], new ActionListener<>() {
synonymsManagementAPIService.putSynonymRule(
synonymSetId,
synonymsSet[randomIntBetween(0, maxSynonymSets)],
new ActionListener<>() {
@Override
public void onResponse(SynonymsManagementAPIService.SynonymsReloadResult synonymsReloadResult) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {
fail("Should update a rule that already exists at max capcity");
}
}
);
}

@Override
public void onFailure(Exception e) {
fail(e);
}
});

latch.await(5, TimeUnit.SECONDS);
}

public void testCreateRuleWithMaxSynonyms() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
String synonymSetId = randomIdentifier();
String ruleId = randomIdentifier();
SynonymRule[] synonymsSet = randomSynonymsSet(maxSynonymSets, maxSynonymSets);
synonymsManagementAPIService.putSynonymsSet(synonymSetId, synonymsSet, new ActionListener<>() {
@Override
public void onResponse(SynonymsManagementAPIService.SynonymsReloadResult synonymsReloadResult) {
// Updating a rule fails
synonymsManagementAPIService.putSynonymRule(synonymSetId, randomSynonymRule(ruleId), new ActionListener<>() {
@Override
public void onResponse(SynonymsManagementAPIService.SynonymsReloadResult synonymsReloadResult) {
fail("Shouldn't have been able to update a rule with max synonyms");
fail("Should not create a new rule that does not exist when at max capacity");
}

@Override
Expand All @@ -206,4 +248,44 @@ public void onFailure(Exception e) {

latch.await(5, TimeUnit.SECONDS);
}

public void testTooManySynonymsOnIndexTriggersWarning() throws InterruptedException {
CountDownLatch insertLatch = new CountDownLatch(1);
String synonymSetId = randomIdentifier();
synonymsManagementAPIService.bulkUpdateSynonymsSet(
synonymSetId,
randomSynonymsSet(atLeast(maxSynonymSets + 1)),
new ActionListener<>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
insertLatch.countDown();
}

@Override
public void onFailure(Exception e) {
fail(e);
}
}
);

insertLatch.await(5, TimeUnit.SECONDS);
Logger logger = mock(Logger.class);
SynonymsManagementAPIService.logger = logger;

CountDownLatch readLatch = new CountDownLatch(1);
synonymsManagementAPIService.getSynonymSetRules(synonymSetId, new ActionListener<>() {
@Override
public void onResponse(PagedResult<SynonymRule> synonymRulePagedResult) {
readLatch.countDown();
}

@Override
public void onFailure(Exception e) {
fail("Should not have been able to retrieve synonyms");
}
});

readLatch.await(5, TimeUnit.SECONDS);
verify(logger).warn(anyString(), eq(synonymSetId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.admin.indices.analyze.TransportReloadAnalyzersAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -34,7 +35,7 @@
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
Expand All @@ -44,6 +45,7 @@
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
Expand All @@ -66,7 +68,6 @@
*/
public class SynonymsManagementAPIService {

private static final Logger logger = LogManager.getLogger(SynonymsManagementAPIService.class);
private static final String SYNONYMS_INDEX_NAME_PATTERN = ".synonyms-*";
private static final int SYNONYMS_INDEX_FORMAT = 2;
private static final String SYNONYMS_INDEX_CONCRETE_NAME = ".synonyms-" + SYNONYMS_INDEX_FORMAT;
Expand All @@ -83,12 +84,15 @@ public class SynonymsManagementAPIService {
// Identifies synonym set objects stored in the index
private static final String SYNONYM_SET_OBJECT_TYPE = "synonym_set";
private static final String SYNONYM_RULE_ID_SEPARATOR = "|";
private static final int MAX_SYNONYMS_SETS = 100_000;
private static final int MAX_SYNONYMS_SETS = 10_000;
private static final String SYNONYM_RULE_ID_FIELD = SynonymRule.ID_FIELD.getPreferredName();
private static final String SYNONYM_SETS_AGG_NAME = "synonym_sets_aggr";
private static final int SYNONYMS_INDEX_MAPPINGS_VERSION = 1;
private final int maxSynonymsSets;

// Package private for testing
static Logger logger = LogManager.getLogger(SynonymsManagementAPIService.class);

private final Client client;

public static final String SYNONYMS_ORIGIN = "synonyms";
Expand Down Expand Up @@ -214,33 +218,21 @@ public void onFailure(Exception e) {
}

/**
* Retrieves all synonym rules for a synonym set. It checks that max_result_window is not exceeded for the index, and
* adjusts the number of synonyms sets returned if needed.
* Retrieves all synonym rules for a synonym set.
*
* @param synonymSetId
* @param listener
*/
public void getSynonymSetRules(String synonymSetId, ActionListener<PagedResult<SynonymRule>> listener) {
// Check the max result window setting to limit the number of synonym rules retrieved.
// This is needed for mixed cluster environments that might not have updated the index setting when doing shard recovering
client.admin()
.indices()
.prepareGetIndex()
.setIndices(SYNONYMS_ALIAS_NAME)
.execute(listener.delegateFailureAndWrap((getIndexListener, getIndexResponse) -> {
Map<String, Settings> settings = getIndexResponse.getSettings();
int size = IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(null);

Settings maxResultWindowSetting = settings.get(SYNONYMS_ALIAS_NAME);
if (maxResultWindowSetting != null) {
Integer synonymsResultWindowSize = maxResultWindowSetting.getAsInt(
IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(),
IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(maxResultWindowSetting)
);
size = Math.max(size, synonymsResultWindowSize);
// Check the number of synonym sets, and issue a warning in case there are more than the maximum allowed
client.prepareSearch(SYNONYMS_ALIAS_NAME)
.setSource(new SearchSourceBuilder().size(0).trackTotalHits(true))
.execute(listener.delegateFailureAndWrap((searchListener, countResponse) -> {
long totalSynonymRules = countResponse.getHits().getTotalHits().value;
if (totalSynonymRules > maxSynonymsSets) {
logger.warn("The number of synonym rules in the synonym set [{}] exceeds the maximum allowed", synonymSetId);
}

getSynonymSetRules(synonymSetId, 0, size, listener);
getSynonymSetRules(synonymSetId, 0, MAX_SYNONYMS_SETS, listener);
}));
}

Expand Down Expand Up @@ -326,20 +318,10 @@ public void putSynonymsSet(String synonymSetId, SynonymRule[] synonymsSet, Actio
}

// Insert as bulk requests
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
try {
// Insert synonym set object
bulkRequestBuilder.add(createSynonymSetIndexRequest(synonymSetId));
// Insert synonym rules
for (SynonymRule synonymRule : synonymsSet) {
bulkRequestBuilder.add(createSynonymRuleIndexRequest(synonymSetId, synonymRule));
}
} catch (IOException ex) {
listener.onFailure(ex);
}

bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute(deleteByQueryResponseListener.delegateFailure((bulkInsertResponseListener, bulkInsertResponse) -> {
bulkUpdateSynonymsSet(
synonymSetId,
synonymsSet,
deleteByQueryResponseListener.delegateFailure((bulkInsertResponseListener, bulkInsertResponse) -> {
if (bulkInsertResponse.hasFailures()) {
logUniqueFailureMessagesWithIndices(
Arrays.stream(bulkInsertResponse.getItems())
Expand All @@ -357,26 +339,46 @@ public void putSynonymsSet(String synonymSetId, SynonymRule[] synonymsSet, Actio
: UpdateSynonymsResultStatus.UPDATED;

reloadAnalyzers(synonymSetId, false, bulkInsertResponseListener, updateSynonymsResultStatus);
}));
})
);
}));
}

// Open for testing adding more synonyms set than the limit allows for
void bulkUpdateSynonymsSet(String synonymSetId, SynonymRule[] synonymsSet, ActionListener<BulkResponse> listener) {
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
try {
// Insert synonym set object
bulkRequestBuilder.add(createSynonymSetIndexRequest(synonymSetId));
// Insert synonym rules
for (SynonymRule synonymRule : synonymsSet) {
bulkRequestBuilder.add(createSynonymRuleIndexRequest(synonymSetId, synonymRule));
}
} catch (IOException ex) {
listener.onFailure(ex);
}

bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute(listener);
}

public void putSynonymRule(String synonymsSetId, SynonymRule synonymRule, ActionListener<SynonymsReloadResult> listener) {
checkSynonymSetExists(synonymsSetId, listener.delegateFailureAndWrap((l1, obj) -> {
// Count synonym rules to check if we're at maximum
BoolQueryBuilder queryFilter = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(SYNONYMS_SET_FIELD, synonymsSetId))
.filter(QueryBuilders.termQuery(OBJECT_TYPE_FIELD, SYNONYM_RULE_OBJECT_TYPE));
if (synonymRule.id() != null) {
// Remove the current synonym rule from the count, so we allow updating a rule at max capacity
queryFilter.mustNot(QueryBuilders.termQuery(SYNONYM_RULE_ID_FIELD, synonymRule.id()));
}
client.prepareSearch(SYNONYMS_ALIAS_NAME)
.setQuery(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(SYNONYMS_SET_FIELD, synonymsSetId))
.filter(QueryBuilders.termQuery(OBJECT_TYPE_FIELD, SYNONYM_RULE_OBJECT_TYPE))
)
.setQuery(queryFilter)
.setSize(0)
.setPreference(Preference.LOCAL.type())
.setTrackTotalHits(true)
.execute(l1.delegateFailureAndWrap((searchListener, searchResponse) -> {
long synonymsSetSize = searchResponse.getHits().getTotalHits().value;
if (synonymsSetSize >= maxSynonymsSets) {
// We could potentially update a synonym rule when we're at max capacity, but we're keeping this simple
listener.onFailure(
new IllegalArgumentException("The number of synonym rules in a synonyms set cannot exceed " + maxSynonymsSets)
);
Expand Down Expand Up @@ -560,7 +562,6 @@ static Settings settings() {
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
.put(IndexMetadata.INDEX_FORMAT_SETTING.getKey(), SYNONYMS_INDEX_FORMAT)
.put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), MAX_SYNONYMS_SETS)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ static SynonymRule randomSynonymRule() {
return randomSynonymRule(randomBoolean() ? null : randomIdentifier());
}

static SynonymRule randomSynonymRule(String id) {
public static SynonymRule randomSynonymRule(String id) {
return new SynonymRule(id, String.join(", ", randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(1, 10))));
}

Expand Down

0 comments on commit 8efeff1

Please sign in to comment.