Skip to content

Commit

Permalink
passing user to validation search calls
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Sep 5, 2024
1 parent ec16d53 commit 78770cf
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.index.query.BoolQueryBuilder;
Expand Down Expand Up @@ -155,14 +156,28 @@ public SearchFeatureDao(
* @param listener onResponse is called with the epoch time of the latest data under the detector
*/
public void getLatestDataTime(Config config, Optional<Entity> entity, AnalysisType context, ActionListener<Optional<Long>> listener) {
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery();
getLatestDataTime(null, config, entity, context, listener);
}

/**
* Returns to listener the epoch time of the latest data under the detector.
*
* @param config info about the data
* @param listener onResponse is called with the epoch time of the latest data under the detector
*/
public void getLatestDataTime(
User user,
Config config,
Optional<Entity> entity,
AnalysisType context,
ActionListener<Optional<Long>> listener
) {
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery();
if (entity.isPresent()) {
for (TermQueryBuilder term : entity.get().getTermQueryForCustomerIndex()) {
internalFilterQuery.filter(term);
}
}

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(internalFilterQuery)
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(config.getTimeField()))
Expand All @@ -172,15 +187,27 @@ public void getLatestDataTime(Config config, Optional<Entity> entity, AnalysisTy
.wrap(response -> listener.onResponse(ParseUtils.getLatestDataTime(response)), listener::onFailure);
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
config.getId(),
client,
context,
searchResponseListener
);
if (user != null) {
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
user,
client,
context,
searchResponseListener
);
} else {
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
config.getId(),
client,
context,
searchResponseListener
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public abstract class AbstractTimeSeriesActionHandler<T extends ActionResponse,

public static final String NAME_REGEX = "[a-zA-Z0-9._-]+";
public static final Integer MAX_NAME_SIZE = 64;
public static final String CATEGORY_NOT_FOUND_ERR_MSG = "Can't find the categorical field %s";
public static final String CATEGORY_NOT_FOUND_ERR_MSG = "Can't find the categorical field %s in index %s";

public static String INVALID_NAME_SIZE = "Name should be shortened. The maximum limit is "
+ AbstractTimeSeriesActionHandler.MAX_NAME_SIZE
Expand Down Expand Up @@ -562,11 +562,11 @@ protected void validateCategoricalFieldsInAllIndices(String configId, boolean in

Iterator<Map.Entry<String, List<String>>> iterator = clusterIndicesMap.entrySet().iterator();

validateCategoricalFieldRecursive(iterator, configId, indexingDryRun, listener);
validateCategoricalField(iterator, configId, indexingDryRun, listener);

}

protected void validateCategoricalFieldRecursive(
protected void validateCategoricalField(
Iterator<Map.Entry<String, List<String>>> iterator,
String configId,
boolean indexingDryRun,
Expand Down Expand Up @@ -645,13 +645,19 @@ protected void validateCategoricalFieldRecursive(
listener
.onFailure(
createValidationException(
String.format(Locale.ROOT, CATEGORY_NOT_FOUND_ERR_MSG, categoryField0),
String
.format(
Locale.ROOT,
CATEGORY_NOT_FOUND_ERR_MSG,
categoryField0,
Arrays.toString(clusterIndicesEntry.getValue().toArray(new String[0]))
),
ValidationIssueType.CATEGORY
)
);
return;
}
validateCategoricalFieldRecursive(iterator, configId, indexingDryRun, listener);
validateCategoricalField(iterator, configId, indexingDryRun, listener);

}, error -> {
String message = String.format(Locale.ROOT, CommonMessages.FAIL_TO_GET_MAPPING_MSG, config.getIndices());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ public void onFailure(Exception e) {
* interval can be a practical approach in scenarios where data arrival is irregular and there's
* a need to balance between capturing data features and avoiding over-sampling.
*
* @param topEntity top entity to use
* @param timeStampBounds Used to determine start and end date range to search for data
* @param listener returns minimum interval
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public LatestTimeRetriever(
* @param listener to return latest time and entity attributes if the config is HC
*/
public void checkIfHC(ActionListener<Pair<Optional<Long>, Map<String, Object>>> listener) {
searchFeatureDao.getLatestDataTime(config, Optional.empty(), context, ActionListener.wrap(latestTime -> {
searchFeatureDao.getLatestDataTime(user, config, Optional.empty(), context, ActionListener.wrap(latestTime -> {
if (latestTime.isEmpty()) {
listener.onResponse(Pair.of(Optional.empty(), Collections.emptyMap()));
} else if (config.isHighCardinality()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ public void start() {
latestEntityAttributes.getRight()
),
exception -> {
listener.onFailure(exception);
logger.error("Failed to create search request for last data point", exception);
listener.onFailure(exception);
}
);
latestTimeRetriever.checkIfHC(latestTimeListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public static HashMap<String, List<String>> separateClusterIndexes(List<String>
Pair<String, String> clusterAndIndex = parseClusterAndIndexName(index);
String clusterName = clusterAndIndex.getKey();
String indexName = clusterAndIndex.getValue();
logger.info("clusterName: " + clusterName);
logger.info("indexName: " + indexName);

// If the index entry does not have a cluster_name, it indicates the index is on the local cluster.
if (clusterName.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,7 @@ public void testValidateAnomalyDetectorWithWrongCategoryField() throws Exception
.extractValue("detector", responseMap);
assertEquals(
"non-existing category",
String.format(Locale.ROOT, AbstractTimeSeriesActionHandler.CATEGORY_NOT_FOUND_ERR_MSG, "host.keyword"),
String.format(Locale.ROOT, AbstractTimeSeriesActionHandler.CATEGORY_NOT_FOUND_ERR_MSG, "host.keyword", "[index-test]"),
messageMap.get("category_field").get("message")
);

Expand Down

0 comments on commit 78770cf

Please sign in to comment.