Skip to content

Commit

Permalink
[ML] Persist data counts in DFA final step with ML origin (elastic#67674
Browse files Browse the repository at this point in the history
)

In elastic#67623 I moved persisting the data counts at the end of a
data frame analytics job into a `FinalStep` class. However,
I forgot to execute the index request with ML origin resulting
in authentication problems if the user that runs the DFA job
does not have read privileges in the ML stats index.

This commit fixes this by executing that index request with ML
origin.
  • Loading branch information
dimitris-athanasiou authored Jan 19, 2021
1 parent addb5cb commit 27f69d4
Showing 1 changed file with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -36,6 +36,7 @@
import java.util.Collections;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* The final step of a data frame analytics job.
Expand Down Expand Up @@ -80,7 +81,7 @@ private void indexDataCounts(ActionListener<IndexResponse> listener) {
.id(DataCounts.documentId(config.getId()))
.setRequireAlias(true)
.source(builder);
parentTaskClient().index(indexRequest, listener);
executeAsyncWithOrigin(parentTaskClient(), ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener);
} catch (IOException e) {
listener.onFailure(ExceptionsHelper.serverError("[{}] Error persisting final data counts", e, config.getId()));
}
Expand All @@ -97,10 +98,7 @@ private void refreshIndices(ActionListener<RefreshResponse> listener) {
LOGGER.debug(() -> new ParameterizedMessage("[{}] Refreshing indices {}", config.getId(),
Arrays.toString(refreshRequest.indices())));

ParentTaskAssigningClient parentTaskClient = parentTaskClient();
try (ThreadContext.StoredContext ignore = parentTaskClient.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
parentTaskClient.admin().indices().refresh(refreshRequest, listener);
}
executeAsyncWithOrigin(parentTaskClient(), ML_ORIGIN, RefreshAction.INSTANCE, refreshRequest, listener);
}

@Override
Expand Down

0 comments on commit 27f69d4

Please sign in to comment.