Skip to content

Commit

Permalink
enhancement(elasticsearch sink): Add update support to bulk action (#…
Browse files Browse the repository at this point in the history
…21860)

* feat(elasticsearch sink): add update in bulk api

* doc(elasticsearch sink): add update in bulk api

* add changelog for elasticsearch bulk API

* fix spelling

* fix the changelog name

* Fix endline for changelog

* fix format code

* this time check passed

* add generated document for elasticsearch

* Update website/cue/reference/components/sinks/base/elasticsearch.cue

Co-authored-by: DeForest Richards <[email protected]>

* Update website/cue/reference/components/sinks/elasticsearch.cue

Co-authored-by: DeForest Richards <[email protected]>

* Update website/cue/reference/components/sinks/elasticsearch.cue

Co-authored-by: DeForest Richards <[email protected]>

* generate docs after change

---------

Co-authored-by: DeForest Richards <[email protected]>
  • Loading branch information
blackrez and drichards-87 authored Nov 22, 2024
1 parent 74f605b commit fd9166b
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add `update` support to the bulk API to Elasticsearch Sink.

authors: blackrez
2 changes: 1 addition & 1 deletion src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl ElasticsearchConfig {
pub struct BulkConfig {
/// Action to use when making requests to the [Elasticsearch Bulk API][es_bulk].
///
/// Only `index` and `create` actions are supported.
/// Only `index`, `create` and `update` actions are supported.
///
/// [es_bulk]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
#[serde(default = "default_bulk_action")]
Expand Down
6 changes: 6 additions & 0 deletions src/sinks/elasticsearch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ pub enum BulkAction {

/// The `create` action.
Create,

/// The `update` action.
Update,
}

#[allow(clippy::trivially_copy_pass_by_ref)]
Expand All @@ -95,13 +98,15 @@ impl BulkAction {
match self {
BulkAction::Index => "index",
BulkAction::Create => "create",
BulkAction::Update => "update",
}
}

pub const fn as_json_pointer(&self) -> &'static str {
match self {
BulkAction::Index => "/index",
BulkAction::Create => "/create",
BulkAction::Update => "/update",
}
}
}
Expand All @@ -113,6 +118,7 @@ impl TryFrom<&str> for BulkAction {
match input {
"index" => Ok(BulkAction::Index),
"create" => Ok(BulkAction::Create),
"update" => Ok(BulkAction::Update),
_ => Err(format!("Invalid bulk action: {}", input)),
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/elasticsearch/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ enum EsResultItem {
Index(EsIndexResult),
#[serde(rename = "create")]
Create(EsIndexResult),
#[serde(rename = "update")]
Update(EsIndexResult),
}

impl EsResultItem {
Expand All @@ -64,6 +66,7 @@ impl EsResultItem {
match self {
EsResultItem::Index(r) => r,
EsResultItem::Create(r) => r,
EsResultItem::Update(r) => r,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ base: components: sinks: elasticsearch: configuration: {
description: """
Action to use when making requests to the [Elasticsearch Bulk API][es_bulk].
Only `index` and `create` actions are supported.
Only `index`, `create` and `update` actions are supported.
[es_bulk]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
"""
Expand Down
3 changes: 3 additions & 0 deletions website/cue/reference/components/sinks/elasticsearch.cue
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ components: sinks: elasticsearch: {
inserted via the `index` action, which replaces documents if an existing
one has the same `id`. If `bulk.action` is configured with `create`, Elasticsearch
does _not_ replace an existing document and instead returns a conflict error.
When `bulk.action` is set to `update`, the document is updated with several constraints.
The message must be added in `.doc` and have `.doc_as_upsert` to true.
The `update` operation requires the `id_key` to be set, and the `encoding` field should specify `doc` and `doc_as_upsert` as values.
"""
}

Expand Down

0 comments on commit fd9166b

Please sign in to comment.