Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry functionality #62

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions lib/logstash/outputs/google_bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
require 'fileutils'
require 'concurrent'

java_import org.apache.logging.log4j.ThreadContext

#
# === Summary
#
Expand Down Expand Up @@ -165,6 +167,12 @@ class LogStash::Outputs::GoogleBigQuery < LogStash::Outputs::Base
# which causes the entire request to fail if any invalid rows exist.
config :skip_invalid_rows, validate: :boolean, default: false

# Retry appending data if the append request failed. If a partial failue was detected, retry only the filed rows, otherwise retry with the full batch.
config :max_tries, validate: :number, default: 1

# Wait this amount of seconds before attempting a retry.
config :retry_delay, validate: :number, default: nil

# The following configuration options still exist to alert users that are using them
config :uploader_interval_secs, validate: :number, deprecated: 'No longer used.'
config :deleter_interval_secs, validate: :number, deprecated: 'No longer used.'
Expand Down Expand Up @@ -236,9 +244,39 @@ def publish(messages)
table = get_table_name
@logger.info("Publishing #{messages.length} messages to #{table}")

create_table_if_not_exists table
begin
try_count ||= 0
try_count += 1

if @retry_delay and try_count > 1
sleep(@retry_delay)
end

create_table_if_not_exists table

failed_rows = @bq_client.append(@dataset, table, messages, @ignore_unknown_values, @skip_invalid_rows)
raise "failed rows" unless failed_rows.empty?
rescue => e
if try_count < @max_tries
if e.to_s == "failed rows"
@logger.warn "#{failed_rows.count} failed rows detected, will retry, remaining tries: #{@max_tries - try_count}."
messages = failed_rows
retry
end
@logger.warn "Cought exception, remaining tries: #{@max_tries - try_count}.", :exception => e
retry
else
if e.to_s != 'failed rows'
@logger.warn 'Giving up'
raise
end
end
ensure
if try_count > 1
@logger.warn "Publish succeeded at try #{try_count}/#{@max_tries}"
end
end

failed_rows = @bq_client.append(@dataset, table, messages, @ignore_unknown_values, @skip_invalid_rows)
write_to_errors_file(failed_rows, table) unless failed_rows.empty?
rescue StandardError => e
@logger.error 'Error uploading data.', :exception => e
Expand Down Expand Up @@ -275,7 +313,9 @@ def write_to_errors_file(messages, table)
end

def init_batcher_flush_thread
pipeline_id = ThreadContext.get('pipeline.id')
@flush_thread = Thread.new do
ThreadContext.put('pipeline.id', pipeline_id)
until stopping?
Stud.stoppable_sleep(@flush_interval_secs) { stopping? }

Expand Down
22 changes: 22 additions & 0 deletions spec/outputs/google_bigquery_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,28 @@
expect(subject).not_to receive(:create_table_if_not_exists)
end

context 'if retry is configured' do
let(:config) { { 'project_id' => 'project', 'dataset' => 'dataset', 'csv_schema' => 'path:STRING,status:INTEGER,score:FLOAT', 'max_tries' => 2, 'retry_delay' => 1 } }

it 'tries again if insert threw an exception' do
allow(subject).to receive(:create_table_if_not_exists).and_return(nil)
allow(bq_client).to receive(:append).and_raise('expected insert error')
expect(bq_client).to receive(:append).twice
expect(subject).to receive(:sleep).once

subject.publish ['{"foo":"bar"}']
end

it 'tries again on failed insert' do
allow(subject).to receive(:create_table_if_not_exists).and_return(nil)
allow(bq_client).to receive(:append).and_return([0])
expect(bq_client).to receive(:append).twice
expect(subject).to receive(:sleep).once

subject.publish ['{"foo":"bar"}']
end
end

it 'creates a table if it does not exist' do
allow(subject).to receive(:create_table_if_not_exists).and_return(nil)
allow(bq_client).to receive(:append).and_return([])
Expand Down