Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved logging when pipeline terminates by error #12274

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ def converge_state(pipeline_actions)

unless action_result.successful?
logger.error("Failed to execute action",
:id => action.pipeline_id,
:action_type => action_result.class,
:pipeline_id => action.pipeline_id,
:action => action.class.to_s.split('::').last,
:message => action_result.message,
:backtrace => action_result.backtrace
)
Expand Down
13 changes: 9 additions & 4 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
module LogStash; class JavaPipeline < JavaBasePipeline
include LogStash::Util::Loggable

class WorkerException < RuntimeError; end

java_import org.apache.logging.log4j.ThreadContext

attr_reader \
Expand Down Expand Up @@ -122,7 +124,8 @@ def start
@thread = Thread.new do
error_log_params = ->(e) {
default_logging_keys(
:exception => e,
:exception => e.class,
:message => e.message,
:backtrace => e.backtrace,
"pipeline.sources" => pipeline_source_details
)
Expand All @@ -133,10 +136,12 @@ def start
ThreadContext.put("pipeline.id", pipeline_id)
run
@finished_run.make_true
rescue => e
rescue WorkerException => e
# no need to log at ERROR level since this log will be redundant to the log in
# the worker loop thread global rescue clause
logger.debug("Pipeline terminated by worker error", error_log_params.call(e))
logger.debug("Worker terminated by error", error_log_params.call(e))
rescue => e
logger.error("Pipeline terminated by error", error_log_params.call(e))
ensure
# we must trap any exception here to make sure the following @finished_execution
# is always set to true regardless of any exception before in the close method call
Expand Down Expand Up @@ -337,7 +342,7 @@ def monitor_inputs_and_workers
# this is a worker thread termination
# delete it from @worker_threads so that wait_for_workers does not wait for it
@worker_threads.delete(terminated_thread)
raise("Worker thread terminated in pipeline.id: #{pipeline_id}")
raise WorkerException.new("Worker thread terminated in pipeline.id: #{pipeline_id}")
end
end

Expand Down