Skip to content

Commit

Permalink
Add data stream timestamp validation via metadata field mapper (#58582)
Browse files Browse the repository at this point in the history
This commit adds a new metadata field mapper that validates,
that a document has exactly a single timestamp value in the data stream timestamp field and
that the timestamp field mapping only has `type`, `meta` or `format` attributes configured.
Other attributes can affect the guarantee that an index with this meta field mapper has a 
useable timestamp field.

The MetadataCreateIndexService inserts a data stream timestamp field mapper whenever
a new backing index of a data stream is created.

Relates to #53100
  • Loading branch information
martijnvg authored Jul 2, 2020
1 parent 2717697 commit 001b3fb
Show file tree
Hide file tree
Showing 24 changed files with 662 additions and 79 deletions.
4 changes: 2 additions & 2 deletions docs/reference/indices/rollover-index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,14 @@ PUT _index_template/template
"template": {
"mappings": {
"properties": {
"@timestamp": {
"date": {
"type": "date"
}
}
}
},
"data_stream": {
"timestamp_field": "@timestamp"
"timestamp_field": "date"
}
}
-----------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
index: simple-data-stream1
id: 1
op_type: create
body: { "text": "test" }
body:
foo: bar
'@timestamp': '2020-12-12'

- do:
indices.refresh:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ teardown:
index:
index: logs-foobar
refresh: true
body: { foo: bar }
body:
foo: bar
timestamp: '2020-12-12'

- do:
reindex:
Expand Down Expand Up @@ -65,7 +67,9 @@ teardown:
index:
index: old-logs-index
refresh: true
body: { foo: bar }
body:
foo: bar
timestamp: '2020-12-12'

- do:
reindex:
Expand Down Expand Up @@ -96,7 +100,9 @@ teardown:
index:
index: logs-foobar
refresh: true
body: { foo: bar }
body:
foo: bar
timestamp: '2020-12-12'

- do:
reindex:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
index: simple-data-stream1
id: 1
op_type: create
body: { "number": 4 }
body: { "number": 4, '@timestamp': '2020-12-12' }

# rollover data stream to create new backing index
- do:
Expand All @@ -47,7 +47,7 @@
index: simple-data-stream1
id: 2
op_type: create
body: { "number": 1 }
body: { "number": 1, '@timestamp': '2020-12-12' }

# rollover data stream to create another new backing index
- do:
Expand All @@ -64,7 +64,7 @@
index: simple-data-stream1
id: 3
op_type: create
body: { "number": 5 }
body: { "number": 5, '@timestamp': '2020-12-12' }

- do:
indices.refresh:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ setup:
- do:
index:
index: simple-data-stream1
body: { foo: bar }
body:
'@timestamp': '2020-12-12'
foo: bar

- do:
indices.refresh:
Expand Down Expand Up @@ -241,27 +243,27 @@ setup:
- do:
index:
index: logs-foobar
body: { foo: bar }
body: { timestamp: '2020-12-12' }
- match: { _index: .ds-logs-foobar-000001 }

- do:
catch: bad_request
index:
index: .ds-logs-foobar-000001
body: { foo: bar }
body: { timestamp: '2020-12-12' }

- do:
bulk:
body:
- create:
_index: .ds-logs-foobar-000001
- foo: bar
- timestamp: '2020-12-12'
- index:
_index: .ds-logs-foobar-000001
- foo: bar
- timestamp: '2020-12-12'
- create:
_index: logs-foobar
- foo: bar
- timestamp: '2020-12-12'
- match: { errors: true }
- match: { items.0.create.status: 400 }
- match: { items.0.create.error.type: illegal_argument_exception }
Expand All @@ -276,3 +278,58 @@ setup:
indices.delete_data_stream:
name: logs-foobar
- is_true: acknowledged

---
"Indexing a document into a data stream without a timestamp field":
- skip:
version: " - 7.9.99"
reason: "enable in 7.9+ when backported"
features: allowed_warnings

- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
indices.put_index_template:
name: generic_logs_template
body:
index_patterns: logs-*
template:
mappings:
properties:
'timestamp':
type: date
data_stream:
timestamp_field: timestamp

- do:
catch: bad_request
index:
index: logs-foobar
body: { foo: bar }

- do:
bulk:
body:
- create:
_index: logs-foobar
- foo: bar
- create:
_index: logs-foobar
- timestamp: '2020-12-12'
- create:
_index: logs-foobar
- timestamp: ['2020-12-12', '2022-12-12']
- match: { errors: true }
- match: { items.0.create.status: 400 }
- match: { items.0.create.error.caused_by.type: illegal_argument_exception }
- match: { items.0.create.error.caused_by.reason: "data stream timestamp field [timestamp] is missing" }
- match: { items.1.create.result: created }
- match: { items.1.create._index: .ds-logs-foobar-000001 }
- match: { items.2.create.status: 400 }
- match: { items.2.create.error.caused_by.type: illegal_argument_exception }
- match: { items.2.create.error.caused_by.reason: "data stream timestamp field [timestamp] encountered multiple values" }

- do:
indices.delete_data_stream:
name: logs-foobar
- is_true: acknowledged
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
index:
index: logs-foobar
refresh: true
body: { foo: bar }
body:
'@timestamp': '2020-12-12'
foo: bar
- match: {_index: .ds-logs-foobar-000001}

- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
index:
index: logs-foobar
refresh: true
body: { foo: bar }
body:
'timestamp': '2020-12-12'
foo: bar

- do:
search:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,28 +233,28 @@ public void testMixedAutoCreate() throws Exception {
client().execute(PutComposableIndexTemplateAction.INSTANCE, createTemplateRequest).actionGet();

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-foobaz").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barfoo").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-foobaz").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barfoo").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));

bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));

bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-foobaz3").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barfoo3").opType(CREATE).source("{}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-foobaz3").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkRequest.add(new IndexRequest("logs-barfoo3").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));

Expand Down
Loading

0 comments on commit 001b3fb

Please sign in to comment.