diff --git a/changelog.d/21860_elasticsearch_add_update_bulk_api.feature.md b/changelog.d/21860_elasticsearch_add_update_bulk_api.feature.md new file mode 100644 index 0000000000000..cf7a4d3cb553c --- /dev/null +++ b/changelog.d/21860_elasticsearch_add_update_bulk_api.feature.md @@ -0,0 +1,3 @@ +Add `update` support to the bulk API to Elasticsearch Sink. + +authors: blackrez diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 9be7a814fb7f6..e6312d6b84277 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -282,7 +282,7 @@ impl ElasticsearchConfig { pub struct BulkConfig { /// Action to use when making requests to the [Elasticsearch Bulk API][es_bulk]. /// - /// Only `index` and `create` actions are supported. + /// Only `index`, `create` and `update` actions are supported. /// /// [es_bulk]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html #[serde(default = "default_bulk_action")] diff --git a/src/sinks/elasticsearch/mod.rs b/src/sinks/elasticsearch/mod.rs index ab4a19ccb15f5..4f07fca4cff2b 100644 --- a/src/sinks/elasticsearch/mod.rs +++ b/src/sinks/elasticsearch/mod.rs @@ -87,6 +87,9 @@ pub enum BulkAction { /// The `create` action. Create, + + /// The `update` action. + Update, } #[allow(clippy::trivially_copy_pass_by_ref)] @@ -95,6 +98,7 @@ impl BulkAction { match self { BulkAction::Index => "index", BulkAction::Create => "create", + BulkAction::Update => "update", } } @@ -102,6 +106,7 @@ impl BulkAction { match self { BulkAction::Index => "/index", BulkAction::Create => "/create", + BulkAction::Update => "/update", } } } @@ -113,6 +118,7 @@ impl TryFrom<&str> for BulkAction { match input { "index" => Ok(BulkAction::Index), "create" => Ok(BulkAction::Create), + "update" => Ok(BulkAction::Update), _ => Err(format!("Invalid bulk action: {}", input)), } } diff --git a/src/sinks/elasticsearch/retry.rs b/src/sinks/elasticsearch/retry.rs index 705d54ed9189e..76a48216d7bf1 100644 --- a/src/sinks/elasticsearch/retry.rs +++ b/src/sinks/elasticsearch/retry.rs @@ -56,6 +56,8 @@ enum EsResultItem { Index(EsIndexResult), #[serde(rename = "create")] Create(EsIndexResult), + #[serde(rename = "update")] + Update(EsIndexResult), } impl EsResultItem { @@ -64,6 +66,7 @@ impl EsResultItem { match self { EsResultItem::Index(r) => r, EsResultItem::Create(r) => r, + EsResultItem::Update(r) => r, } } } diff --git a/website/cue/reference/components/sinks/base/elasticsearch.cue b/website/cue/reference/components/sinks/base/elasticsearch.cue index 3dfa6c0e17ec2..c70767fb062fe 100644 --- a/website/cue/reference/components/sinks/base/elasticsearch.cue +++ b/website/cue/reference/components/sinks/base/elasticsearch.cue @@ -249,7 +249,7 @@ base: components: sinks: elasticsearch: configuration: { description: """ Action to use when making requests to the [Elasticsearch Bulk API][es_bulk]. - Only `index` and `create` actions are supported. + Only `index`, `create` and `update` actions are supported. [es_bulk]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html """ diff --git a/website/cue/reference/components/sinks/elasticsearch.cue b/website/cue/reference/components/sinks/elasticsearch.cue index 18d7ab204bb01..d0bc191f5f4ba 100644 --- a/website/cue/reference/components/sinks/elasticsearch.cue +++ b/website/cue/reference/components/sinks/elasticsearch.cue @@ -91,6 +91,9 @@ components: sinks: elasticsearch: { inserted via the `index` action, which replaces documents if an existing one has the same `id`. If `bulk.action` is configured with `create`, Elasticsearch does _not_ replace an existing document and instead returns a conflict error. + When `bulk.action` is set to `update`, the document is updated with several constraints. + The message must be added in `.doc` and have `.doc_as_upsert` to true. + The `update` operation requires the `id_key` to be set, and the `encoding` field should specify `doc` and `doc_as_upsert` as values. """ }