Skip to content

Commit

Permalink
refactor(elasticsearch): use _index as greptimedb table in log inge…
Browse files Browse the repository at this point in the history
…stion and add `/${index}/_bulk` API (GreptimeTeam#5335)

* refactor(elasticsearch): use `_index` as greptimedb table in log ingestion and add `/${index}/_bulk` API

Signed-off-by: zyy17 <[email protected]>

* refactor: code review

---------

Signed-off-by: zyy17 <[email protected]>
  • Loading branch information
zyy17 authored Jan 13, 2025
1 parent 6eb746d commit 3f01f67
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 127 deletions.
260 changes: 194 additions & 66 deletions src/servers/src/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;
use std::time::Instant;

use axum::extract::{Query, State};
use axum::extract::{Path, Query, State};
use axum::headers::ContentType;
use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use axum::response::IntoResponse;
Expand All @@ -32,7 +32,8 @@ use crate::error::{
Result as ServersResult,
};
use crate::http::event::{
ingest_logs_inner, LogIngesterQueryParams, LogState, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
ingest_logs_inner, LogIngestRequest, LogIngesterQueryParams, LogState,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
};
use crate::metrics::{
METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
Expand Down Expand Up @@ -81,15 +82,39 @@ pub async fn handle_get_license() -> impl IntoResponse {
(StatusCode::OK, elasticsearch_headers(), axum::Json(body))
}

// Process `_bulk` API requests. Only support to create logs.
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
/// Process `_bulk` API requests. Only support to create logs.
/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
#[axum_macros::debug_handler]
pub async fn handle_bulk_api(
State(log_state): State<LogState>,
Query(params): Query<LogIngesterQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
Extension(query_ctx): Extension<QueryContext>,
TypedHeader(_content_type): TypedHeader<ContentType>,
payload: String,
) -> impl IntoResponse {
do_handle_bulk_api(log_state, None, params, query_ctx, payload).await
}

/// Process `/${index}/_bulk` API requests. Only support to create logs.
/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
#[axum_macros::debug_handler]
pub async fn handle_bulk_api_with_index(
State(log_state): State<LogState>,
Path(index): Path<String>,
Query(params): Query<LogIngesterQueryParams>,
Extension(query_ctx): Extension<QueryContext>,
TypedHeader(_content_type): TypedHeader<ContentType>,
payload: String,
) -> impl IntoResponse {
do_handle_bulk_api(log_state, Some(index), params, query_ctx, payload).await
}

async fn do_handle_bulk_api(
log_state: LogState,
index: Option<String>,
params: LogIngesterQueryParams,
mut query_ctx: QueryContext,
payload: String,
) -> impl IntoResponse {
let start = Instant::now();
debug!(
Expand All @@ -107,21 +132,6 @@ pub async fn handle_bulk_api(
.with_label_values(&[&db])
.start_timer();

let table = if let Some(table) = params.table {
table
} else {
return (
StatusCode::BAD_REQUEST,
elasticsearch_headers(),
axum::Json(write_bulk_response(
start.elapsed().as_millis() as i64,
0,
StatusCode::BAD_REQUEST.as_u16() as u32,
"require parameter 'table'",
)),
);
};

// If pipeline_name is not provided, use the internal pipeline.
let pipeline = if let Some(pipeline) = params.pipeline_name {
pipeline
Expand All @@ -130,8 +140,8 @@ pub async fn handle_bulk_api(
};

// Read the ndjson payload and convert it to a vector of Value.
let log_values = match convert_es_input_to_log_values(&payload, &params.msg_field) {
Ok(log_values) => log_values,
let requests = match parse_bulk_request(&payload, &index, &params.msg_field) {
Ok(requests) => requests,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Expand All @@ -145,14 +155,13 @@ pub async fn handle_bulk_api(
);
}
};
let log_num = log_values.len();
let log_num = requests.len();

if let Err(e) = ingest_logs_inner(
log_state.log_handler,
pipeline,
None,
table,
log_values,
requests,
Arc::new(query_ctx),
)
.await
Expand Down Expand Up @@ -237,16 +246,18 @@ pub fn elasticsearch_headers() -> HeaderMap {
ELASTICSEARCH_HEADERS.clone()
}

// Parse the Elasticsearch bulk request and convert it to multiple LogIngestRequests.
// The input will be Elasticsearch bulk request in NDJSON format.
// For example, the input will be like this:
// { "index" : { "_index" : "test", "_id" : "1" } }
// { "field1" : "value1" }
// { "index" : { "_index" : "test", "_id" : "2" } }
// { "field2" : "value2" }
fn convert_es_input_to_log_values(
fn parse_bulk_request(
input: &str,
index_from_url: &Option<String>,
msg_field: &Option<String>,
) -> ServersResult<Vec<Value>> {
) -> ServersResult<Vec<LogIngestRequest>> {
// Read the ndjson payload and convert it to `Vec<Value>`. Return error if the input is not a valid JSON.
let values: Vec<Value> = Deserializer::from_str(input)
.into_iter::<Value>()
Expand All @@ -261,49 +272,80 @@ fn convert_es_input_to_log_values(
}
);

let mut log_values: Vec<Value> = Vec::with_capacity(values.len() / 2);
let mut requests: Vec<LogIngestRequest> = Vec::with_capacity(values.len() / 2);
let mut values = values.into_iter();

// Read the ndjson payload and convert it to a (index, value) vector.
// For Elasticsearch post `_bulk` API, each chunk contains two objects:
// 1. The first object is the command, it should be `create` or `index`. `create` is used for insert, `index` is used for upsert.
// 2. The second object is the document data.
let mut is_document = false;
for v in values {
if !is_document {
// Read the first object to get the command, it should be `create` or `index`.
ensure!(
v.get("create").is_some() || v.get("index").is_some(),
InvalidElasticsearchInputSnafu {
reason: format!(
"invalid bulk request, expected 'create' or 'index' but got {:?}",
v
),
}
);
is_document = true;
continue;
}
// 1. The first object is the command, it should be `create` or `index`.
// 2. The second object is the document data.
while let Some(mut cmd) = values.next() {
// NOTE: Although the native Elasticsearch API supports upsert in `index` command, we don't support change any data in `index` command and it's same as `create` command.
let index = if let Some(cmd) = cmd.get_mut("create") {
get_index_from_cmd(cmd.take())?
} else if let Some(cmd) = cmd.get_mut("index") {
get_index_from_cmd(cmd.take())?
} else {
return InvalidElasticsearchInputSnafu {
reason: format!(
"invalid bulk request, expected 'create' or 'index' but got {:?}",
cmd
),
}
.fail();
};

// It means the second object is the document data.
if is_document {
// Read the second object to get the document data. Stop the loop if there is no document.
if let Some(document) = values.next() {
// If the msg_field is provided, fetch the value of the field from the document data.
let log_value = if let Some(msg_field) = msg_field {
get_log_value_from_msg_field(v, msg_field)
get_log_value_from_msg_field(document, msg_field)
} else {
v
document
};

log_values.push(log_value);
ensure!(
index.is_some() || index_from_url.is_some(),
InvalidElasticsearchInputSnafu {
reason: "missing index in bulk request".to_string(),
}
);

// Reset the flag for the next chunk.
is_document = false;
requests.push(LogIngestRequest {
table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
values: vec![log_value],
});
}
}

debug!("Received log data: {:?}", log_values);
debug!(
"Received {} log ingest requests: {:?}",
requests.len(),
requests
);

Ok(log_values)
Ok(requests)
}

// Get the index from the command. We will take index as the table name in GreptimeDB.
fn get_index_from_cmd(mut v: Value) -> ServersResult<Option<String>> {
if let Some(index) = v.get_mut("_index") {
if let Value::String(index) = index.take() {
Ok(Some(index))
} else {
// If the `_index` exists, it should be a string.
InvalidElasticsearchInputSnafu {
reason: "index is not a string in bulk request".to_string(),
}
.fail()
}
} else {
Ok(None)
}
}

// If the msg_field is provided, fetch the value of the field from the document data.
// For example, if the `msg_field` is `message`, and the document data is `{"message":"hello"}`, the log value will be Value::String("hello").
fn get_log_value_from_msg_field(mut v: Value, msg_field: &str) -> Value {
if let Some(message) = v.get_mut(msg_field) {
let message = message.take();
Expand All @@ -327,7 +369,7 @@ mod tests {
use super::*;

#[test]
fn test_convert_es_input_to_log_values() {
fn test_parse_bulk_request() {
let test_cases = vec![
// Normal case.
(
Expand All @@ -338,9 +380,76 @@ mod tests {
{"foo2":"foo2_value","bar2":"bar2_value"}
"#,
None,
None,
Ok(vec![
LogIngestRequest {
table: "test".to_string(),
values: vec![
json!({"foo1": "foo1_value", "bar1": "bar1_value"}),
],
},
LogIngestRequest {
table: "test".to_string(),
values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})],
},
]),
),
// Case with index.
(
r#"
{"create":{"_index":"test","_id":"1"}}
{"foo1":"foo1_value", "bar1":"bar1_value"}
{"create":{"_index":"logs","_id":"2"}}
{"foo2":"foo2_value","bar2":"bar2_value"}
"#,
Some("logs".to_string()),
None,
Ok(vec![
LogIngestRequest {
table: "test".to_string(),
values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})],
},
LogIngestRequest {
table: "logs".to_string(),
values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})],
},
]),
),
// Case with index.
(
r#"
{"create":{"_index":"test","_id":"1"}}
{"foo1":"foo1_value", "bar1":"bar1_value"}
{"create":{"_index":"logs","_id":"2"}}
{"foo2":"foo2_value","bar2":"bar2_value"}
"#,
Some("logs".to_string()),
None,
Ok(vec![
LogIngestRequest {
table: "test".to_string(),
values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})],
},
LogIngestRequest {
table: "logs".to_string(),
values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})],
},
]),
),
// Case with incomplete bulk request.
(
r#"
{"create":{"_index":"test","_id":"1"}}
{"foo1":"foo1_value", "bar1":"bar1_value"}
{"create":{"_index":"logs","_id":"2"}}
"#,
Some("logs".to_string()),
None,
Ok(vec![
json!({"foo1": "foo1_value", "bar1": "bar1_value"}),
json!({"foo2": "foo2_value", "bar2": "bar2_value"}),
LogIngestRequest {
table: "test".to_string(),
values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})],
},
]),
),
// Specify the `data` field as the message field and the value is a JSON string.
Expand All @@ -351,10 +460,17 @@ mod tests {
{"create":{"_index":"test","_id":"2"}}
{"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
"#,
None,
Some("data".to_string()),
Ok(vec![
json!({"foo1": "foo1_value", "bar1": "bar1_value"}),
json!({"foo2": "foo2_value", "bar2": "bar2_value"}),
LogIngestRequest {
table: "test".to_string(),
values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})],
},
LogIngestRequest {
table: "test".to_string(),
values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})],
},
]),
),
// Simulate the log data from Logstash.
Expand All @@ -365,10 +481,21 @@ mod tests {
{"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
{"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
"#,
None,
Some("message".to_string()),
Ok(vec![
json!("172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""),
json!("10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""),
LogIngestRequest {
table: "logs-generic-default".to_string(),
values: vec![
json!("172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""),
],
},
LogIngestRequest {
table: "logs-generic-default".to_string(),
values: vec![
json!("10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""),
],
},
]),
),
// With invalid bulk request.
Expand All @@ -378,18 +505,19 @@ mod tests {
{ "foo1" : "foo1_value", "bar1" : "bar1_value" }
"#,
None,
None,
Err(InvalidElasticsearchInputSnafu {
reason: "it's a invalid bulk request".to_string(),
}),
),
];

for (input, msg_field, expected) in test_cases {
let log_values = convert_es_input_to_log_values(input, &msg_field);
for (input, index, msg_field, expected) in test_cases {
let requests = parse_bulk_request(input, &index, &msg_field);
if expected.is_ok() {
assert_eq!(log_values.unwrap(), expected.unwrap());
assert_eq!(requests.unwrap(), expected.unwrap());
} else {
assert!(log_values.is_err());
assert!(requests.is_err());
}
}
}
Expand Down
Loading

0 comments on commit 3f01f67

Please sign in to comment.