Skip to content

Commit

Permalink
[ML] Text/Log categorization multi-bucket aggregation (#71752)
Browse files Browse the repository at this point in the history
This commit adds a new multi-bucket aggregation: `categorize_text`

The aggregation follows a similar design to significant text in that it reads from `_source`
and re-analyzes the the text as it is read. 

Key difference is that it does not use the indexed field's analyzer, but instead relies on 
the `ml_standard` tokenizer with specialized ML token filters. The tokenizer + filters are the
same that machine learning categorization anomaly jobs utilize.

The high level logical flow is as follows:
 - at each shard, read in the text field with a custom analyzer using `ml_standard` tokenizer
 - Read in the particular tokens from the analyzer
 - Feed these tokens to a token tree algorithm (an adaptation of the drain categorization algorithm)
 - Gather the individual log categories (the leaf nodes), sort them by doc_count, ship those buckets to be merged
 - Merge all buckets that have the EXACT same key
 - Once all buckets are merged, pass those keys + counts to a new token tree for additional merging
 - That tree builds the final buckets and that is returned to the user

Algorithm explanation:

 - Each log is parsed with the ml-standard tokenizer
 - each token is passed into a token tree
 - For `max_match_token` each token is stored in the tree and at `max_match_token+1` (or `len(tokens)`) a log group is created
 - If another log group exists at that leaf, merge it if they have `similarity_threshold` percentage of tokens in common
     - merging simply replaces tokens that are different in the group with `*`
 - If a layer in the tree has `max_unique_tokens` we add a `*` child and any new tokens are passed through there. Catch here is that on the final merge, we first attempt to merge together subtrees with the smallest number of documents. Especially if the new sub tree has more documents counted.

## Aggregation configuration.

Here is an example on some openstack logs
```js
POST openstack/_search?size=0
{
  "aggs": {
    "categories": {
      "categorize_text": {
        "field": "message", // The field to categorize
        "similarity_threshold": 20, // merge log groups if they are this similar
        "max_unique_tokens": 20, // Max Number of children per token position
        "max_match_token": 4, // Maximum tokens to build prefix trees
        "size": 1
      }
    }
  }
}
```

This will return buckets like
```json
"aggregations" : {
    "categories" : {
      "buckets" : [
        {
          "doc_count" : 806,
          "key" : "nova-api.log.1.2017-05-16_13 INFO nova.osapi_compute.wsgi.server * HTTP/1.1 status len time"
        }
      ]
    }
  }
```
  • Loading branch information
benwtrent authored Oct 4, 2021
1 parent 5de59aa commit 7a7fffc
Show file tree
Hide file tree
Showing 34 changed files with 3,871 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.NameOrDefinition;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.fielddata.IndexFieldData;
Expand Down Expand Up @@ -197,6 +198,22 @@ public long nowInMillis() {
return 0;
}

@Override
public Analyzer getNamedAnalyzer(String analyzer) {
return null;
}

@Override
public Analyzer buildCustomAnalyzer(
IndexSettings indexSettings,
boolean normalizer,
NameOrDefinition tokenizer,
List<NameOrDefinition> charFilters,
List<NameOrDefinition> tokenFilters
) {
return null;
}

@Override
protected IndexFieldData<?> buildFieldData(MappedFieldType ft) {
IndexFieldDataCache indexFieldDataCache = indicesFieldDataCache.buildIndexFieldDataCache(new IndexFieldDataCache.Listener() {
Expand Down
33 changes: 33 additions & 0 deletions docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,39 @@ buildRestTests.setups['farequote_datafeed'] = buildRestTests.setups['farequote_j
"indexes":"farequote"
}
'''
buildRestTests.setups['categorize_text'] = '''
- do:
indices.create:
index: log-messages
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
properties:
time:
type: date
message:
type: text
- do:
bulk:
index: log-messages
refresh: true
body: |
{"index": {"_id":"1"}}
{"time":"2016-02-07T00:01:00+0000", "message": "2016-02-07T00:00:00+0000 Node 3 shutting down"}
{"index": {"_id":"2"}}
{"time":"2016-02-07T00:02:00+0000", "message": "2016-02-07T00:00:00+0000 Node 5 starting up"}
{"index": {"_id":"3"}}
{"time":"2016-02-07T00:03:00+0000", "message": "2016-02-07T00:00:00+0000 Node 4 shutting down"}
{"index": {"_id":"4"}}
{"time":"2016-02-08T00:01:00+0000", "message": "2016-02-08T00:00:00+0000 Node 5 shutting down"}
{"index": {"_id":"5"}}
{"time":"2016-02-08T00:02:00+0000", "message": "2016-02-08T00:00:00+0000 User foo_325 logging on"}
{"index": {"_id":"6"}}
{"time":"2016-02-08T00:04:00+0000", "message": "2016-02-08T00:00:00+0000 User foo_864 logged off"}
'''
buildRestTests.setups['server_metrics_index'] = '''
- do:
indices.create:
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/aggregations/bucket.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ include::bucket/adjacency-matrix-aggregation.asciidoc[]

include::bucket/autodatehistogram-aggregation.asciidoc[]

include::bucket/categorize-text-aggregation.asciidoc[]

include::bucket/children-aggregation.asciidoc[]

include::bucket/composite-aggregation.asciidoc[]
Expand Down
Loading

0 comments on commit 7a7fffc

Please sign in to comment.