-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data Frame] Refactor PUT transform to not create a task #39934
[Data Frame] Refactor PUT transform to not create a task #39934
Conversation
* POST _start creates the task and starts it * GET transforms queries docs instead of tasks * POST _stop verifies the stored config exists before trying to stop the task
Pinging @elastic/ml-core |
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Added some comments
@@ -47,6 +47,7 @@ public void testUsage() throws IOException { | |||
|
|||
// create a transform | |||
createPivotReviewsTransform("test_usage", "pivot_reviews", null); | |||
startAndWaitForTransform("test_usage", "pivot_reviews"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this required now? If so, it looks like a bug to me, usage
should report on the number of transforms created whether they are started or not. With other words: this test should pass without this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hendrikmuhs, I will have to update the usage API with this change then. I will do that and remove this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, @hendrikmuhs, looking at DataFrameFeatureSet#usage
, it calls the stats
endpoint and until stats
are stored in a doc, we need the task to exist. I will mark this with a TODO to signify such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean to get this?
assertEquals(0, XContentMapValues.extractValue("data_frame.stats.index_failures", usageAsMap));
Agreed, this requires the state and I think it's fine to call startAndWaitForTransform
in order to get it.
However this test:
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
should work.
What about:
createPivotReviewsTransform("test_usage", "pivot_reviews", null);
usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
usageAsMap = entityAsMap(usageResponse);
// we should see the job
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
// but no stats
assertEquals(null, XContentMapValues.extractValue("data_frame.stats", usageAsMap));
// start it
startAndWaitForTransform("test_usage", "pivot_reviews");
// get usage again
usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
usageAsMap = entityAsMap(usageResponse);
// we should see some stats
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
assertEquals(0, XContentMapValues.extractValue("data_frame.stats.index_failures", usageAsMap));
assertEquals(???, XContentMapValues.extractValue("data_frame.stats.documents_indexed", usageAsMap));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hendrikmuhs , what makes this difficult is that there is no way to get stats at all outside of the allocated persistent task as it is stored in Indexer.getStats()
. Consequently, if the process crashed and had to move to another node, all previous stats would be lost.
I will refactor this the usage endpoint, but it will immediately have to be refactored again when we store stats in something outside of the running state of the allocated task on the node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, I am not sure you want to get all non-running transforms...a hearkening back to your worries of 10k+ configurations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 and now the fun begins! Excellent find! This is called regularly if usage collection is enabled.
If I remember correctly this summarizes the overall transforms and transforms per state. Makes sense to me, I think we should keep doing this. But now that we store everything in an index we can re-factor (this is pre-index code).
Anyway, can you put this into an issue? No need to solve this as part of this PR.
Additionally I think this isn't only a problem here, other parts of the code count in a similar fashion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hendrikmuhs definitely, I will open an issue. I think if we move stats to an index, aggregations will solve this for us, there may be some wonkiness around transforms that are yet to have a task created (do we create a blank stats document when the transform is created?).
|
||
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); | ||
} else { | ||
putDataFrame(config, listener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: reading this is a bit counter-intuitive as I wondered, hey the other case doesn't call putDataFrame
, I now see it is wrapped. Maybe add a comment here, e.g.: "no need to check access rights, go straight to creation"
...in/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java
Show resolved
Hide resolved
...ain/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java
Outdated
Show resolved
Hide resolved
...ain/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java
Outdated
Show resolved
Hide resolved
searchResponse -> { | ||
List<DataFrameTransformConfig> configs = new ArrayList<>(searchResponse.getHits().getHits().length); | ||
for (SearchHit hit : searchResponse.getHits().getHits()) { | ||
DataFrameTransformConfig config = parseTransformLenientlyFromSourceSync(hit.getSourceRef(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am worried: what's the size of 10000 config objects? I would not be worried about the XContent
, but we parse all of those and create objects including nested objects like AggregationBuilders, QueryBuilders, etc. In addition: for GET
we parse every query but do not use it.
What about having different methods? In the end we have 2 usecases:
- getting exactly 1 config for the purpose of using it (indexer)
- getting the configuration of 1 or more config for the purpose of returning the XContent of it (
GET
), if possible avoid object creations but at least do not keep 10k objects in a list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making this change would make data frame transforms different to every other endpoint in the code base.
Remember also that in the transport client the GET
endpoint needs to return valid objects, not XContent.
If 10000 is too big then the solution that fits with the way everything else works would be to implement pagination and restrict the maximum page size to something less than 10000. But I think that should be done in a different PR (if at all).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am adding pagination to this endpoint (and stats) eventually and am planning on doing that in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@droberts195 @benwtrent I am unsure if you understood my point. This wasn't about endpoints but about the methods in this class. Imagine we really have 10k transforms, we are creating a list of 10k objects, those objects have inner objects. These 10k "rich objects" are temporary objects as you call toXContent()
on them at the end. Only the string representation is needed, the parsed objects are just side artifacts.
It's correct that the response object has to support wire serialization, however the inner implementation of the response object is completely up to us. There is no need to store a list of DataFrameTransformConfig
objects. The requirements of a response object can be fulfilled without storing these "rich objects"[*].
Pagination doesn't solve the problem as long as we keep the size of a single page at 10k (see line 142).
[*] Having that said, I simply do not see a need to return 10k configs at a time. I do not even see the need to solve the >10k transforms usecase in 7.1.
I am fine with "fixing this in an upcoming PR" - but I strongly disagree doing it this way. At line 142 we explicitly set the limit to 10k and as explained above I do not think the implementation is ready to support 10k due to the high memory usage it requires. We should set a lower limit there for the time being and we can "fix this in an upcoming PR". We should not put the system at risk for this! Progress over perfection but in a safe way.
Pagination in combination with a smaller page size sounds like a good alternative, again getting 10k configs at a time, I do not see a need for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hendrikmuhs when searching and getting a result, we will indicate the size and page parameters given (after another refactor to move common code in to xpack.core, see #39976). These paging params will default to size 100
and start: 0
, respectively.
Addendum, I am against passing around strings and maps more than necessary. This sacrifices one of the securities of having a statically typed language. I think worrying about this type of serialization overhead is pre-optimization at the cost of compile time guarantees.
I am fine lowering the default in this PR for now, but I think chances of this bringing down a cluster due to somebody creating 10k transforms between us merging this PR and then putting in sensible paging parameters in a follow up PR are infinitesimal (7.1 is not even released yet.).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine if that gets addressed in a PR that hits 7.1, sure. When the discussion started it wasn't clear what "follow up PR" means (also given that the problem existed prior this PR).
Pagination with a sensible limit per page sounds great! No need to further discuss about optimizing the size of the list once we have that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to step away from the idea of returning > 10K results. I think the way forward is to allow searching (i.e. implement the AbstractTransportGetResourcesAction
). Then if a user hits > 10K results, we should error. The user can then change the query in order to reduce the matching resources. Anything else is not scalable.
final DiscoveryNodes nodes = state.nodes(); | ||
|
||
if (nodes.isLocalNodeElectedMaster()) { | ||
if (DataFramePersistentTaskUtils.stateHasDataFrameTransforms(request.getId(), state)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did DataFramePersistentTaskUtils
became dead code? If so, can you remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hendrikmuhs, not yet. Once we have stats in an index instead of cluster state, the stats
endpoint can be refactored to not use DataFramePersistentTaskUtils.stateHasDataFrameTransforms
any longer. At which point, it will be dead code and can be exorcised from the code base.
new ActionListenerResponseHandler<>(listener, Response::new)); | ||
} | ||
} | ||
//TODO support comma delimited and simple regex IDs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am hitting a need for this too for getting sets of data frame analytics jobs. So it would be good if you could add a reusable class for doing this into the X-Pack core library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I'm happy to merge this and tidy up the loose ends in follow up PRs.
* [Data Frame] Refactor PUT transform such that: * POST _start creates the task and starts it * GET transforms queries docs instead of tasks * POST _stop verifies the stored config exists before trying to stop the task * Addressing PR comments * Refactoring DataFrameFeatureSet#usage, decreasing size returned getTransformConfigurations * fixing failing usage test
…0010) * [Data Frame] Refactor PUT transform such that: * POST _start creates the task and starts it * GET transforms queries docs instead of tasks * POST _stop verifies the stored config exists before trying to stop the task * Addressing PR comments * Refactoring DataFrameFeatureSet#usage, decreasing size returned getTransformConfigurations * fixing failing usage test
Refactor PUT transforms such that:
the task