diff --git a/src/servers/src/elasticsearch.rs b/src/servers/src/elasticsearch.rs index 30fb7af0db42..58c6aa520a61 100644 --- a/src/servers/src/elasticsearch.rs +++ b/src/servers/src/elasticsearch.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use std::time::Instant; -use axum::extract::{Query, State}; +use axum::extract::{Path, Query, State}; use axum::headers::ContentType; use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; use axum::response::IntoResponse; @@ -32,7 +32,8 @@ use crate::error::{ Result as ServersResult, }; use crate::http::event::{ - ingest_logs_inner, LogIngesterQueryParams, LogState, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, + ingest_logs_inner, LogIngestRequest, LogIngesterQueryParams, LogState, + GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, }; use crate::metrics::{ METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED, @@ -81,15 +82,39 @@ pub async fn handle_get_license() -> impl IntoResponse { (StatusCode::OK, elasticsearch_headers(), axum::Json(body)) } -// Process `_bulk` API requests. Only support to create logs. -// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request. +/// Process `_bulk` API requests. Only support to create logs. +/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request. #[axum_macros::debug_handler] pub async fn handle_bulk_api( State(log_state): State, Query(params): Query, - Extension(mut query_ctx): Extension, + Extension(query_ctx): Extension, TypedHeader(_content_type): TypedHeader, payload: String, +) -> impl IntoResponse { + do_handle_bulk_api(log_state, None, params, query_ctx, payload).await +} + +/// Process `/${index}/_bulk` API requests. Only support to create logs. +/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request. +#[axum_macros::debug_handler] +pub async fn handle_bulk_api_with_index( + State(log_state): State, + Path(index): Path, + Query(params): Query, + Extension(query_ctx): Extension, + TypedHeader(_content_type): TypedHeader, + payload: String, +) -> impl IntoResponse { + do_handle_bulk_api(log_state, Some(index), params, query_ctx, payload).await +} + +async fn do_handle_bulk_api( + log_state: LogState, + index: Option, + params: LogIngesterQueryParams, + mut query_ctx: QueryContext, + payload: String, ) -> impl IntoResponse { let start = Instant::now(); debug!( @@ -107,21 +132,6 @@ pub async fn handle_bulk_api( .with_label_values(&[&db]) .start_timer(); - let table = if let Some(table) = params.table { - table - } else { - return ( - StatusCode::BAD_REQUEST, - elasticsearch_headers(), - axum::Json(write_bulk_response( - start.elapsed().as_millis() as i64, - 0, - StatusCode::BAD_REQUEST.as_u16() as u32, - "require parameter 'table'", - )), - ); - }; - // If pipeline_name is not provided, use the internal pipeline. let pipeline = if let Some(pipeline) = params.pipeline_name { pipeline @@ -130,8 +140,8 @@ pub async fn handle_bulk_api( }; // Read the ndjson payload and convert it to a vector of Value. - let log_values = match convert_es_input_to_log_values(&payload, ¶ms.msg_field) { - Ok(log_values) => log_values, + let requests = match parse_bulk_request(&payload, &index, ¶ms.msg_field) { + Ok(requests) => requests, Err(e) => { return ( StatusCode::BAD_REQUEST, @@ -145,14 +155,13 @@ pub async fn handle_bulk_api( ); } }; - let log_num = log_values.len(); + let log_num = requests.len(); if let Err(e) = ingest_logs_inner( log_state.log_handler, pipeline, None, - table, - log_values, + requests, Arc::new(query_ctx), ) .await @@ -237,16 +246,18 @@ pub fn elasticsearch_headers() -> HeaderMap { ELASTICSEARCH_HEADERS.clone() } +// Parse the Elasticsearch bulk request and convert it to multiple LogIngestRequests. // The input will be Elasticsearch bulk request in NDJSON format. // For example, the input will be like this: // { "index" : { "_index" : "test", "_id" : "1" } } // { "field1" : "value1" } // { "index" : { "_index" : "test", "_id" : "2" } } // { "field2" : "value2" } -fn convert_es_input_to_log_values( +fn parse_bulk_request( input: &str, + index_from_url: &Option, msg_field: &Option, -) -> ServersResult> { +) -> ServersResult> { // Read the ndjson payload and convert it to `Vec`. Return error if the input is not a valid JSON. let values: Vec = Deserializer::from_str(input) .into_iter::() @@ -261,49 +272,80 @@ fn convert_es_input_to_log_values( } ); - let mut log_values: Vec = Vec::with_capacity(values.len() / 2); + let mut requests: Vec = Vec::with_capacity(values.len() / 2); + let mut values = values.into_iter(); + // Read the ndjson payload and convert it to a (index, value) vector. // For Elasticsearch post `_bulk` API, each chunk contains two objects: - // 1. The first object is the command, it should be `create` or `index`. `create` is used for insert, `index` is used for upsert. - // 2. The second object is the document data. - let mut is_document = false; - for v in values { - if !is_document { - // Read the first object to get the command, it should be `create` or `index`. - ensure!( - v.get("create").is_some() || v.get("index").is_some(), - InvalidElasticsearchInputSnafu { - reason: format!( - "invalid bulk request, expected 'create' or 'index' but got {:?}", - v - ), - } - ); - is_document = true; - continue; - } + // 1. The first object is the command, it should be `create` or `index`. + // 2. The second object is the document data. + while let Some(mut cmd) = values.next() { + // NOTE: Although the native Elasticsearch API supports upsert in `index` command, we don't support change any data in `index` command and it's same as `create` command. + let index = if let Some(cmd) = cmd.get_mut("create") { + get_index_from_cmd(cmd.take())? + } else if let Some(cmd) = cmd.get_mut("index") { + get_index_from_cmd(cmd.take())? + } else { + return InvalidElasticsearchInputSnafu { + reason: format!( + "invalid bulk request, expected 'create' or 'index' but got {:?}", + cmd + ), + } + .fail(); + }; - // It means the second object is the document data. - if is_document { + // Read the second object to get the document data. Stop the loop if there is no document. + if let Some(document) = values.next() { // If the msg_field is provided, fetch the value of the field from the document data. let log_value = if let Some(msg_field) = msg_field { - get_log_value_from_msg_field(v, msg_field) + get_log_value_from_msg_field(document, msg_field) } else { - v + document }; - log_values.push(log_value); + ensure!( + index.is_some() || index_from_url.is_some(), + InvalidElasticsearchInputSnafu { + reason: "missing index in bulk request".to_string(), + } + ); - // Reset the flag for the next chunk. - is_document = false; + requests.push(LogIngestRequest { + table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()), + values: vec![log_value], + }); } } - debug!("Received log data: {:?}", log_values); + debug!( + "Received {} log ingest requests: {:?}", + requests.len(), + requests + ); - Ok(log_values) + Ok(requests) } +// Get the index from the command. We will take index as the table name in GreptimeDB. +fn get_index_from_cmd(mut v: Value) -> ServersResult> { + if let Some(index) = v.get_mut("_index") { + if let Value::String(index) = index.take() { + Ok(Some(index)) + } else { + // If the `_index` exists, it should be a string. + InvalidElasticsearchInputSnafu { + reason: "index is not a string in bulk request".to_string(), + } + .fail() + } + } else { + Ok(None) + } +} + +// If the msg_field is provided, fetch the value of the field from the document data. +// For example, if the `msg_field` is `message`, and the document data is `{"message":"hello"}`, the log value will be Value::String("hello"). fn get_log_value_from_msg_field(mut v: Value, msg_field: &str) -> Value { if let Some(message) = v.get_mut(msg_field) { let message = message.take(); @@ -327,7 +369,7 @@ mod tests { use super::*; #[test] - fn test_convert_es_input_to_log_values() { + fn test_parse_bulk_request() { let test_cases = vec![ // Normal case. ( @@ -338,9 +380,76 @@ mod tests { {"foo2":"foo2_value","bar2":"bar2_value"} "#, None, + None, + Ok(vec![ + LogIngestRequest { + table: "test".to_string(), + values: vec![ + json!({"foo1": "foo1_value", "bar1": "bar1_value"}), + ], + }, + LogIngestRequest { + table: "test".to_string(), + values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})], + }, + ]), + ), + // Case with index. + ( + r#" + {"create":{"_index":"test","_id":"1"}} + {"foo1":"foo1_value", "bar1":"bar1_value"} + {"create":{"_index":"logs","_id":"2"}} + {"foo2":"foo2_value","bar2":"bar2_value"} + "#, + Some("logs".to_string()), + None, + Ok(vec![ + LogIngestRequest { + table: "test".to_string(), + values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})], + }, + LogIngestRequest { + table: "logs".to_string(), + values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})], + }, + ]), + ), + // Case with index. + ( + r#" + {"create":{"_index":"test","_id":"1"}} + {"foo1":"foo1_value", "bar1":"bar1_value"} + {"create":{"_index":"logs","_id":"2"}} + {"foo2":"foo2_value","bar2":"bar2_value"} + "#, + Some("logs".to_string()), + None, + Ok(vec![ + LogIngestRequest { + table: "test".to_string(), + values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})], + }, + LogIngestRequest { + table: "logs".to_string(), + values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})], + }, + ]), + ), + // Case with incomplete bulk request. + ( + r#" + {"create":{"_index":"test","_id":"1"}} + {"foo1":"foo1_value", "bar1":"bar1_value"} + {"create":{"_index":"logs","_id":"2"}} + "#, + Some("logs".to_string()), + None, Ok(vec![ - json!({"foo1": "foo1_value", "bar1": "bar1_value"}), - json!({"foo2": "foo2_value", "bar2": "bar2_value"}), + LogIngestRequest { + table: "test".to_string(), + values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})], + }, ]), ), // Specify the `data` field as the message field and the value is a JSON string. @@ -351,10 +460,17 @@ mod tests { {"create":{"_index":"test","_id":"2"}} {"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"} "#, + None, Some("data".to_string()), Ok(vec![ - json!({"foo1": "foo1_value", "bar1": "bar1_value"}), - json!({"foo2": "foo2_value", "bar2": "bar2_value"}), + LogIngestRequest { + table: "test".to_string(), + values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})], + }, + LogIngestRequest { + table: "test".to_string(), + values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})], + }, ]), ), // Simulate the log data from Logstash. @@ -365,10 +481,21 @@ mod tests { {"create":{"_id":null,"_index":"logs-generic-default","routing":null}} {"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}} "#, + None, Some("message".to_string()), Ok(vec![ - json!("172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""), - json!("10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""), + LogIngestRequest { + table: "logs-generic-default".to_string(), + values: vec![ + json!("172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""), + ], + }, + LogIngestRequest { + table: "logs-generic-default".to_string(), + values: vec![ + json!("10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""), + ], + }, ]), ), // With invalid bulk request. @@ -378,18 +505,19 @@ mod tests { { "foo1" : "foo1_value", "bar1" : "bar1_value" } "#, None, + None, Err(InvalidElasticsearchInputSnafu { reason: "it's a invalid bulk request".to_string(), }), ), ]; - for (input, msg_field, expected) in test_cases { - let log_values = convert_es_input_to_log_values(input, &msg_field); + for (input, index, msg_field, expected) in test_cases { + let requests = parse_bulk_request(input, &index, &msg_field); if expected.is_ok() { - assert_eq!(log_values.unwrap(), expected.unwrap()); + assert_eq!(requests.unwrap(), expected.unwrap()); } else { - assert!(log_values.is_err()); + assert!(requests.is_err()); } } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 1c32466566cc..0b6d275c3c40 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -769,6 +769,10 @@ impl HttpServer { // Return fake response for Elasticsearch license request. .route("/_license", routing::get(elasticsearch::handle_get_license)) .route("/_bulk", routing::post(elasticsearch::handle_bulk_api)) + .route( + "/:index/_bulk", + routing::post(elasticsearch::handle_bulk_api_with_index), + ) // Return fake response for Elasticsearch ilm request. .route( "/_ilm/policy/*path", diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 54421c9886a5..17fc56f56135 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -84,6 +84,16 @@ pub struct LogIngesterQueryParams { pub msg_field: Option, } +/// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests. +/// Multiple LogIngestRequests will be ingested into the same database with the same pipeline. +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct LogIngestRequest { + /// The table where the log data will be written to. + pub table: String, + /// The log data to be ingested. + pub values: Vec, +} + pub struct PipelineContent(String); #[async_trait] @@ -513,8 +523,10 @@ pub async fn log_ingester( handler, pipeline_name, version, - table_name, - value, + vec![LogIngestRequest { + table: table_name, + values: value, + }], query_ctx, ) .await @@ -543,74 +555,78 @@ pub(crate) async fn ingest_logs_inner( state: PipelineHandlerRef, pipeline_name: String, version: PipelineVersion, - table_name: String, - pipeline_data: Vec, + log_ingest_requests: Vec, query_ctx: QueryContextRef, ) -> Result { let db = query_ctx.get_db_string(); let exec_timer = std::time::Instant::now(); - let mut results = Vec::with_capacity(pipeline_data.len()); - let transformed_data: Rows; - if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME { - let table = state - .get_table(&table_name, &query_ctx) - .await - .context(CatalogSnafu)?; - let rows = pipeline::identity_pipeline(pipeline_data, table) - .context(PipelineTransformSnafu) - .context(PipelineSnafu)?; + let mut insert_requests = Vec::with_capacity(log_ingest_requests.len()); - transformed_data = rows - } else { - let pipeline = state - .get_pipeline(&pipeline_name, version, query_ctx.clone()) - .await?; - - let transform_timer = std::time::Instant::now(); - let mut intermediate_state = pipeline.init_intermediate_state(); - - for v in pipeline_data { - pipeline - .prepare(v, &mut intermediate_state) - .inspect_err(|_| { - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED - .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) - .observe(transform_timer.elapsed().as_secs_f64()); - }) - .context(PipelineTransformSnafu) - .context(PipelineSnafu)?; - let r = pipeline - .exec_mut(&mut intermediate_state) - .inspect_err(|_| { - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED - .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) - .observe(transform_timer.elapsed().as_secs_f64()); - }) + for request in log_ingest_requests { + let transformed_data: Rows = if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME { + let table = state + .get_table(&request.table, &query_ctx) + .await + .context(CatalogSnafu)?; + pipeline::identity_pipeline(request.values, table) .context(PipelineTransformSnafu) - .context(PipelineSnafu)?; - results.push(r); - pipeline.reset_intermediate_state(&mut intermediate_state); - } + .context(PipelineSnafu)? + } else { + let pipeline = state + .get_pipeline(&pipeline_name, version, query_ctx.clone()) + .await?; - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED - .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE]) - .observe(transform_timer.elapsed().as_secs_f64()); + let transform_timer = std::time::Instant::now(); + let mut intermediate_state = pipeline.init_intermediate_state(); + let mut results = Vec::with_capacity(request.values.len()); + for v in request.values { + pipeline + .prepare(v, &mut intermediate_state) + .inspect_err(|_| { + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); + }) + .context(PipelineTransformSnafu) + .context(PipelineSnafu)?; + let r = pipeline + .exec_mut(&mut intermediate_state) + .inspect_err(|_| { + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); + }) + .context(PipelineTransformSnafu) + .context(PipelineSnafu)?; + results.push(r); + pipeline.reset_intermediate_state(&mut intermediate_state); + } - transformed_data = Rows { - rows: results, - schema: pipeline.schemas().clone(), + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); + + Rows { + rows: results, + schema: pipeline.schemas().clone(), + } }; + + insert_requests.push(RowInsertRequest { + rows: Some(transformed_data), + table_name: request.table.clone(), + }); } - let insert_request = RowInsertRequest { - rows: Some(transformed_data), - table_name: table_name.clone(), - }; - let insert_requests = RowInsertRequests { - inserts: vec![insert_request], - }; - let output = state.insert(insert_requests, query_ctx).await; + let output = state + .insert( + RowInsertRequests { + inserts: insert_requests, + }, + query_ctx, + ) + .await; if let Ok(Output { data: OutputData::AffectedRows(rows), diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 93042f9c0d30..917660a5c3e2 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -98,6 +98,7 @@ macro_rules! http_tests { test_loki_pb_logs, test_loki_json_logs, test_elasticsearch_logs, + test_elasticsearch_logs_with_index, ); )* }; @@ -1990,7 +1991,7 @@ pub async fn test_elasticsearch_logs(store_type: StorageType) { HeaderName::from_static("content-type"), HeaderValue::from_static("application/json"), )], - "/v1/elasticsearch/_bulk?table=elasticsearch_logs_test", + "/v1/elasticsearch/_bulk", body.as_bytes().to_vec(), false, ) @@ -1998,12 +1999,64 @@ pub async fn test_elasticsearch_logs(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); - let expected = "[[\"foo_value2\",\"value2\"],[\"foo_value1\",\"value1\"]]"; + let expected = "[[\"foo_value1\",\"value1\"],[\"foo_value2\",\"value2\"]]"; validate_data( "test_elasticsearch_logs", &client, - "select foo, bar from elasticsearch_logs_test;", + "select foo, bar from test;", + expected, + ) + .await; + + guard.remove_all().await; +} + +pub async fn test_elasticsearch_logs_with_index(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_elasticsearch_logs_with_index").await; + + let client = TestClient::new(app); + + // It will write to test_index1 and test_index2(specified in the path). + let body = r#" + {"create":{"_index":"test_index1","_id":"1"}} + {"foo":"foo_value1", "bar":"value1"} + {"create":{"_id":"2"}} + {"foo":"foo_value2","bar":"value2"} + "#; + + let res = send_req( + &client, + vec![( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/json"), + )], + "/v1/elasticsearch/test_index2/_bulk", + body.as_bytes().to_vec(), + false, + ) + .await; + + assert_eq!(StatusCode::OK, res.status()); + + // test content of test_index1 + let expected = "[[\"foo_value1\",\"value1\"]]"; + validate_data( + "test_elasticsearch_logs_with_index", + &client, + "select foo, bar from test_index1;", + expected, + ) + .await; + + // test content of test_index2 + let expected = "[[\"foo_value2\",\"value2\"]]"; + validate_data( + "test_elasticsearch_logs_with_index_2", + &client, + "select foo, bar from test_index2;", expected, ) .await;