Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: INLINESTATS #109583

Merged
merged 63 commits into from
Jul 24, 2024
Merged

ESQL: INLINESTATS #109583

merged 63 commits into from
Jul 24, 2024

Conversation

nik9000
Copy link
Member

@nik9000 nik9000 commented Jun 11, 2024

This implements INLINESTATS. Most of the heavy lifting is done by LOOKUP, with this change mostly adding a new abstraction to logical plans, an interface I'm calling Phased. Implementing this interface allows a logical plan node to cut the query into phases. INLINESTATS implements it by asking for a "first phase" that's the same query, up to INLINESTATS, but with INLINESTATS replaced with STATS. The next phase replaces the INLINESTATS with a hash join on the results of the first phase.

So, this query:

FROM foo
| EVAL bar = a * b
| INLINESTATS m = MAX(bar) BY b
| WHERE m = bar
| LIMIT 1

gets split into

FROM foo
| EVAL bar = a * b
| STATS m = MAX(bar) BY b

followed by

FROM foo
| EVAL bar = a * b
| LOOKUP (results of m = MAX(bar) BY b) ON b
| WHERE m = bar
| LIMIT 1

Here's an example of the syntax:

$ curl -k -XDELETE -u'elastic:password' 'http://localhost:9200/test'
$ for a in {0..99}; do
    echo -n $a
    rm -f /tmp/bulk
    for b in {0..999}; do
        echo '{"index": {}}' >> /tmp/bulk
        echo '{"a": '$a', "b": '$b'}' >> /tmp/bulk
    done
    curl -s -k -XPOST -u'elastic:password' -HContent-Type:application/json 'http://localhost:9200/test/_bulk?pretty' --data-binary @/tmp/bulk | grep errors
done
$ curl -s -k -XPOST -u'elastic:password' -HContent-Type:application/json 'http://localhost:9200/test/_forcemerge?max_num_segments=1&pretty'
$ curl -s -k -XPOST -u'elastic:password' -HContent-Type:application/json 'http://localhost:9200/test/_refresh?pretty'
$ curl -k -XPOST -u'elastic:password' -HContent-Type:application/json http://localhost:9200/_query?pretty -d'{
    "query": "FROM test | INLINESTATS m=MAX(a * b) BY b | WHERE m == a * b | SORT a DESC, b DESC | LIMIT 1",
    "profile": true
}'
{
  "columns" : [
    {
      "name" : "a",
      "type" : "long"
    },
    {
      "name" : "b",
      "type" : "long"
    },
    {
      "name" : "m",
      "type" : "long"
    }
  ],
  "values" : [
    [
      99,
      999,
      98901
    ]

Closes #107589

This implements `INLINESTATS`. Most of the heavy lifting is done by
`LOOKUP`, with this change mostly adding a new abstraction to logical
plans, and interface I'm calling `Phased`. Implementing this interface
allows a logical plan node to cut the query into phases. `INLINESTATS`
implements it by asking for a "first phase" that's the same query, up to
`INLINESTATS`, but with `INLINESTATS` replaced with `STATS`. The next
phase replaces the `INLINESTATS` with a `LOOKUP` on the results of the
first phase.

So, this query:
```
FROM foo
| EVAL bar = a * b
| INLINESTATS m = MAX(bar) BY b
| WHERE m = bar
| LIMIT 1
```

gets split into
```
FROM foo
| EVAL bar = a * b
| STATS m = MAX(bar) BY b
```

followed by
```
FROM foo
| EVAL bar = a * b
| LOOKUP (results of m = MAX(bar) BY b) ON b
| WHERE m = bar
| LIMIT 1
```
@nik9000
Copy link
Member Author

nik9000 commented Jun 11, 2024

This is riddled with NOCOMMITs, as any good prototype deserves to be.

It's also not clear that this is the right way to go about this. I mean, cutting the request into two is the right way to do it, I think. The reworking of the callbacks to make that possible seems like the sane way to do that.

But from a planning side, this is an "interesting" choice. We analyze the INLINESTATS call, but then don't run the optimizer on it at all - that seems bad. OTOH, we do run the optimizer on the two halves of the query. That seems good.

@nik9000
Copy link
Member Author

nik9000 commented Jul 3, 2024

I've extracted #110445 out of this one so I can get it and not have to deal with merge conflicts.

@nik9000
Copy link
Member Author

nik9000 commented Jul 9, 2024

image

@nik9000 nik9000 added the ES|QL-ui Impacts ES|QL UI label Jul 9, 2024
@nik9000
Copy link
Member Author

nik9000 commented Jul 10, 2024

There's a fun bug that I'm going to probably leave for a followup:

FROM airports
| INLINESTATS min_scalerank=MIN(scalerank) BY type
| MV_EXPAND type
| WHERE scalerank == MV_MIN(scalerank);

Hits the rule execution limit. It started from a typo - I meant to do scalerank == MV_MIN(min_scalerank) but didn't. And then it hit the rule execution limit. Then I removed extra commands from the query until I got to more minimal recreation.

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dnhatn I'm wondering if this Phased strategy would also work in case of rate function.

I encountered a similar issue while implementing metrics aggregations. To address it, I chose to execute mixed aggregations, wrapped in to_partial and from_partial, in a single phase along with other aggregations. I made this decision for two reasons:

  • Data consistency: As others have noted, we need to ensure data consistency between phases, which can be challenging because we open and close readers of target shards in batches on data nodes to avoid holding excessive resources (e.g., file descriptors).
  • Execution redundancy: We might need to execute the initial part of each phase multiple times, such as LuceneQuery and FieldExact.

An alternative approach I explored was implementing a multiplexed pipeline. This method involves broadcasting/scattering pages into multiple sub-plans and then gathering the pages. With InlineStats, we can gather and join pages using a HashJoin (or Eval). I did not spend enough time to make this alternative approach ready. However, we can revisit it if we encounter issues with maintaining data consistency across phases.

However, this PR looks good. Great work. Thank you, Nik!

FROM employees
| KEEP emp_no, languages
| INLINESTATS max_lang = MAX(languages)
| WHERE max_lang == languages
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice examples ❤️ . It would be lovely if we could push down the filter in the second phase to Lucene instead of using HashJoin, so that we can avoid scanning the entire dataset. Let's address this later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah! I'm pretty sure we can push these down. That's on the followup list!

Copy link
Contributor

@astefan astefan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

I am ok with the code as is now and the awesome usecases it's unlocking, if some aspects will definitely be addressed later (some missing use cases and some performance improvements missing) and if we are ok with the "experimental" label until then.

@nik9000
Copy link
Member Author

nik9000 commented Jul 22, 2024

NOCOMMIT: Link the new javadocs for Phased and EsqlSession into the package level javadoc.

@nik9000
Copy link
Member Author

nik9000 commented Jul 23, 2024

NOCOMMIT: Link the new javadocs for Phased and EsqlSession into the package level javadoc.

Pushed a link.

Comment on lines 405 to 411
shadowingLimit0
required_capability: inlinestats

ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
| INLINESTATS env=VALUES(right) BY client_ip
| LIMIT 0
;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the limit0 tests added here; the limit0 tests are only present in enrich.csv-spec so we can run at least the logical optimizer against tests that otherwise would need enrich_load.

Comment on lines +441 to +442
ROW city = "Zürich"
| INLINESTATS x=VALUES(city), x=VALUES(city)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add a shadowingSelf test!

| INLINESTATS city = COUNT(city)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. That.... Probably isn't going to work. I guess we do want it to work....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'd be required to be consistent with enrich, eval, dissect and grok. I think we can add this to the list of follow ups, as this probably requires work on LOOKUP resp. JOIN.

Object value = BlockUtils.toJavaObject(p.getBlock(i), 0);
values.add(new Alias(source(), s.name(), null, new Literal(source(), value, s.dataType()), aggregates.get(i).id()));
}
return new Eval(source(), child(), values);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeahp, this should work and is conceptually a bit nicer (IMHO) than doing that down in the physical mapping!

}

@Override
public LogicalPlan nextPhase(List<Attribute> schema, List<Page> firstPhaseResult) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using the schema from the first phase is reasonable, but throwing an IllegalArgumentException if the schema doesn't line up with what we expected will make our lives much easier, esp. if this should blow up in production.

Comment on lines +53 to +54
* <p>If there are multiple {@linkplain Phased} nodes in the plan we always
* operate on the lowest one first, counting from the data source "upwards".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3

@nik9000 nik9000 added test-release Trigger CI checks against release build and removed release highlight labels Jul 23, 2024
@nik9000
Copy link
Member Author

nik9000 commented Jul 23, 2024

I've put this behind a feature flag and removed release highlight because it's not super clear that this'll get un-feature flagged in 8.16.

There are a bunch of extra follow ups. I had a conversation with @costin about moving around how we trigger the Phased nature. The plan now is to merge this as is and rework some things.

I'm going to push a few more tests and see if we can land this. Folks can experiment with it some more while we finish up.

}

@Override
public LogicalPlan nextPhase(List<Attribute> schema, List<Page> firstPhaseResult) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Nik, this is much better now!

I have one last nit: the check is slightly insufficient because

firstPhase().output().equals(schema) == false

will not look at the name ids (they are not checked in NamedExpression.equals(), nor in NamedExpression's descendants). The plan may become inconsistent if the name ids do not line up, even if the first phase produces the correct names and data types. I think what we need is a little helper that we should call here.

public static equalsAndSemanticEquals(List<Attribute> left, List<Attribute> right) {
    if (left.equals(right) == false) {
        return false;
    }
    for (int i = 0; i < left.size(); i++) {
        if (left.get(i).semanticEquals(right.get(i)) == false) {
            return false;
        }
    }
    return true;
}

We could put that into the Expressions class (not Expression).

Either this, or we ignore name ids from the first phase: in ungroupedNextPhase we already do this, because we obtain the name ids from the aggregates. In groupedNextPhase, however, we put the schema's attributes directly as the attributes of the local relation.

@nik9000 nik9000 added the auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) label Jul 24, 2024
@nik9000
Copy link
Member Author

nik9000 commented Jul 24, 2024

Release tests look unrelated. We'll fight with them as fight comes up.

@nik9000 nik9000 merged commit b5c6c2d into elastic:main Jul 24, 2024
14 of 16 checks passed
@nik9000 nik9000 deleted the inlinestats branch July 24, 2024 21:17
weizijun added a commit to weizijun/elasticsearch that referenced this pull request Jul 25, 2024
* main: (39 commits)
  Update README.asciidoc (elastic#111244)
  ESQL: INLINESTATS (elastic#109583)
  ESQL: Document a little of `DataType` (elastic#111250)
  Relax assertions in segment level field stats (elastic#111243)
  LogsDB data generator - support nested object field (elastic#111206)
  Validate `Authorization` header in Azure test fixture (elastic#111242)
  Fixing HistoryStoreTests.testPut() and testStoreWithHideSecrets() (elastic#111246)
  [ESQL] Remove Named Expcted Types map from testing infrastructure  (elastic#111213)
  Change visibility of createWriter to allow tests from a different package to override it (elastic#111234)
  [ES|QL] Remove EsqlDataTypes (elastic#111089)
  Mute org.elasticsearch.repositories.azure.AzureBlobContainerRetriesTests testReadNonexistentBlobThrowsNoSuchFileException elastic#111233
  Abstract codec lookup by name, to make CodecService extensible (elastic#111007)
  Add HTTPS support to `AzureHttpFixture` (elastic#111228)
  Unmuting tests related to free_context action being processed in ESSingleNodeTestCase (elastic#111224)
  Upgrade Azure SDK (elastic#111225)
  Collapse transport versions for 8.14.0 (elastic#111199)
  Make sure contender uses logs templates (elastic#111183)
  unmute HistogramPercentileAggregationTests.testBoxplotHistogram (elastic#111223)
  Refactor Quality Assurance test infrastructure (elastic#111195)
  Mute org.elasticsearch.xpack.restart.FullClusterRestartIT testDisableFieldNameField {cluster=UPGRADED} elastic#111222
  ...

# Conflicts:
#	server/src/main/java/org/elasticsearch/TransportVersions.java
@costin
Copy link
Member

costin commented Aug 28, 2024

FTR, I've raised an meta ticket to track my progress at #112266

@maggieghamry
Copy link

@costin would it be possible to update this blog https://www.elastic.co/search-labs/blog/esql-piped-query-language-goes-ga to note that INLINESTATS is not yet available/will only be available in 8.16.0?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) ES|QL-ui Impacts ES|QL UI >feature Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) test-release Trigger CI checks against release build v8.16.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support inline stats
10 participants