diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 14232d2cc1f7..ccac6ce56504 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -218,6 +218,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d - Add request trace logging to http_endpoint input. {issue}36951[36951] {pull}36957[36957] - Made GCS input GA and updated docs accordingly. {pull}37127[37127] - Suppress and log max HTTP request retry errors in CEL input. {pull}37160[37160] +- Prevent CEL input from re-entering the eval loop when an evaluation failed. {pull}37161[37161] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-cel.asciidoc b/x-pack/filebeat/docs/inputs/input-cel.asciidoc index 786f98aa842f..7c6c0ca0469a 100644 --- a/x-pack/filebeat/docs/inputs/input-cel.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cel.asciidoc @@ -140,7 +140,7 @@ The field should be an array, but in the case of an error condition in the CEL p <3> If `rate_limit` is present it must be a map with numeric fields `rate` and `burst`. The `rate_limit` field may also have a string `error` field and other fields which will be logged. If it has an `error` field, the `rate` and `burst` will not be used to set rate limit behavior. The {mito_docs}/lib#Limit[Limit], and {mito_docs}/lib#OktaRateLimit[Okta Rate Limit policy] and {mito_docs}/lib#DraftRateLimit[Draft Rate Limit policy] documentation show how to construct this field. -<4> The evaluation is repeated with the new state, after removing the events field, if the "want_more" field is present and true, and a non-zero events array is returned. +<4> The evaluation is repeated with the new state, after removing the events field, if the "want_more" field is present and true, and a non-zero events array is returned. If the "want_more" field is present after a failed evaluation, it is set to false. The `status_code`, `header` and `rate_limit` values may be omitted if the program is not interacting with an HTTP API end-point and so will not be needed to contribute to program control. diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 69b4da8ac444..b65fb660314f 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -978,12 +978,14 @@ func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{} } if err != nil { state["events"] = errorMessage(fmt.Sprintf("failed eval: %v", err)) + clearWantMore(state) return state, fmt.Errorf("failed eval: %w", err) } v, err := out.ConvertToNative(reflect.TypeOf((*structpb.Struct)(nil))) if err != nil { state["events"] = errorMessage(fmt.Sprintf("failed proto conversion: %v", err)) + clearWantMore(state) return state, fmt.Errorf("failed proto conversion: %w", err) } switch v := v.(type) { @@ -993,10 +995,21 @@ func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{} // This should never happen. errMsg := fmt.Sprintf("unexpected native conversion type: %T", v) state["events"] = errorMessage(errMsg) + clearWantMore(state) return state, errors.New(errMsg) } } +// clearWantMore sets the state to not request additional work in a periodic evaluation. +// It leaves state intact if there is no "want_more" element, and sets the element to false +// if there is. This is necessary instead of just doing delete(state, "want_more") as +// client CEL code may expect the want_more field to be present. +func clearWantMore(state map[string]interface{}) { + if _, ok := state["want_more"]; ok { + state["want_more"] = false + } +} + func errorMessage(msg string) map[string]interface{} { return map[string]interface{}{"error": map[string]interface{}{"message": msg}} }