From b5151b4f501e21b03459a2a85f92836946f2aef3 Mon Sep 17 00:00:00 2001 From: rmontenegroo Date: Sun, 20 Dec 2020 18:59:04 -0300 Subject: [PATCH] Use bucket based on placeholder Signed-off-by: rmontenegroo --- .gitignore | 2 + VERSION | 2 +- lib/fluent/plugin/out_s3.rb | 92 +++++++++++++++++++++++++++++-------- test/test_out_s3.rb | 8 ++-- 4 files changed, 79 insertions(+), 25 deletions(-) diff --git a/.gitignore b/.gitignore index 73ddea98..8f980920 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,5 @@ vendor .ruby-version test/tmp/ + +docker/ diff --git a/VERSION b/VERSION index 3e1ad720..bc80560f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.5.0 \ No newline at end of file +1.5.0 diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb index 956ed968..f3d8237a 100644 --- a/lib/fluent/plugin/out_s3.rb +++ b/lib/fluent/plugin/out_s3.rb @@ -87,6 +87,8 @@ def initialize config_param :aws_iam_retries, :integer, default: nil, deprecated: "Use 'instance_profile_credentials' instead" desc "S3 bucket name" config_param :s3_bucket, :string + desc "Set bucket name fallback if fails fetching from placeholders" + config_param :s3_bucket_fallback, :string, :default => nil desc "S3 region name" config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1" desc "Use 's3_region' instead" @@ -249,11 +251,14 @@ def start s3_client = Aws::S3::Client.new(options) @s3 = Aws::S3::Resource.new(client: s3_client) - @bucket = @s3.bucket(@s3_bucket) - check_apikeys if @check_apikey_on_start - ensure_bucket if @check_bucket - ensure_bucket_lifecycle + @tag_placeholders = get_placeholders_tag(@s3_bucket) + @keys_placeholders = get_placeholders_keys(@s3_bucket) + @time_placeholders = ext_get_placeholders_time(@s3_bucket) + + if @tag_placeholders.empty? && @keys_placeholders.empty? && @time_placeholders.empty? + @bucket = create_bucket(@s3_bucket) + end super end @@ -263,7 +268,28 @@ def format(tag, time, record) @formatter.format(tag, time, r) end + def create_bucket(name) + bucket = @s3.bucket(name) + check_apikeys(bucket) if @check_apikey_on_start + ensure_bucket(bucket) if @check_bucket + ensure_bucket_lifecycle(bucket) + bucket + end + + def use_fallback(placeholder) + if !@s3_bucket_fallback + raise "It was not possible to extract '#{placeholder}' placeholder from chunk and @s3_bucket_fallback is not set." + end + log.warn "Using @s3_bucket_fallback ('#{@s3_bucket_fallback}') as a fallback bucket name." + @s3_bucket_fallback + end + + def ext_get_placeholders_time(str) + output = [ "%S", "%M", "%H", "%d", "%m", "%Y" ].select { |tp| str.include? tp } + end + def write(chunk) + i = 0 metadata = chunk.metadata previous_path = nil @@ -273,6 +299,31 @@ def write(chunk) @time_slice_with_tz.call(metadata.timekey) end + bucket = @bucket ? @bucket : nil + + if (!bucket) && (!@tag_placeholders.empty?) + if (!chunk.metadata.tag) || ((@tag_placeholders.max + 1) > chunk.metadata.tag.split('.').length) + bucket = create_bucket(use_fallback("tag")) + end + end + + if !bucket + @keys_placeholders.each do |placeholder| + if (!chunk.metadata.variables) || (!chunk.metadata.variables.keys.include?(placeholder.to_sym)) + bucket = create_bucket(use_fallback(placeholder)) + break + end + end + end + + if (!bucket) && (!chunk.metadata.timekey) && @time_placeholders + bucket = create_bucket(use_fallback("time")) + end + + if !bucket + bucket = create_bucket(extract_placeholders(@s3_bucket, chunk)) + end + if @check_object begin @values_for_s3_object_chunk[chunk.unique_id] ||= { @@ -304,7 +355,7 @@ def write(chunk) i += 1 previous_path = s3path - end while @bucket.object(s3path).exists? + end while bucket.object(s3path).exists? else if @localtime hms_slicer = Time.now.strftime("%H%M%S") @@ -362,18 +413,19 @@ def write(chunk) put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)}) end end - @bucket.object(s3path).put(put_options) + bucket.object(s3path).put(put_options) @values_for_s3_object_chunk.delete(chunk.unique_id) if @warn_for_delay if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay - log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" + log.warn "out_s3: delayed events were put to s3://#{bucket.name}/#{s3path}" end end ensure tmp.close(true) rescue nil end + end private @@ -399,34 +451,34 @@ def timekey_to_timeformat(timekey) end end - def ensure_bucket - if !@bucket.exists? + def ensure_bucket(bucket) + if !bucket.exists? if @auto_create_bucket - log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}" - @s3.create_bucket(bucket: @s3_bucket) + log.info "Creating bucket #{bucket.name} on #{@s3_endpoint}" + @s3.create_bucket(bucket: bucket.name) else - raise "The specified bucket does not exist: bucket = #{@s3_bucket}" + raise "The specified bucket does not exist: bucket = #{bucket.name}" end end end - def ensure_bucket_lifecycle + def ensure_bucket_lifecycle(bucket) unless @bucket_lifecycle_rules.empty? - old_rules = get_bucket_lifecycle_rules + old_rules = get_bucket_lifecycle_rules(bucket) new_rules = @bucket_lifecycle_rules.sort_by { |rule| rule.id }.map do |rule| { id: rule.id, expiration: { days: rule.expiration_days }, prefix: rule.prefix, status: "Enabled" } end unless old_rules == new_rules - log.info "Configuring bucket lifecycle rules for #{@s3_bucket} on #{@s3_endpoint}" - @bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } }) + log.info "Configuring bucket lifecycle rules for #{bucket.name} on #{@s3_endpoint}" + bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } }) end end end - def get_bucket_lifecycle_rules + def get_bucket_lifecycle_rules(bucket) begin - @bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule| + bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule| { id: rule[:id], expiration: { days: rule[:expiration][:days] }, prefix: rule[:prefix], status: rule[:status] } end rescue Aws::S3::Errors::NoSuchLifecycleConfiguration @@ -461,8 +513,8 @@ def check_s3_path_safety(conf) end end - def check_apikeys - @bucket.objects(prefix: @path, :max_keys => 1).first + def check_apikeys(bucket) + bucket.objects(prefix: @path, :max_keys => 1).first rescue Aws::S3::Errors::NoSuchBucket # ignore NoSuchBucket Error because ensure_bucket checks it. rescue => e diff --git a/test/test_out_s3.rb b/test/test_out_s3.rb index c1530e62..f475d9fa 100644 --- a/test/test_out_s3.rb +++ b/test/test_out_s3.rb @@ -44,10 +44,10 @@ def write(chunk) private - def ensure_bucket + def ensure_bucket(bucket) end - def check_apikeys + def check_apikeys(bucket) end end.configure(conf) end @@ -287,7 +287,7 @@ def write(chunk) private - def check_apikeys + def check_apikeys(bucket) end end.configure(conf) end @@ -427,7 +427,7 @@ def setup_mocks(exists_return = false) mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client } @s3_resource = mock(Aws::S3::Resource.new(client: @s3_client)) mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource } - @s3_bucket = mock(Aws::S3::Bucket.new(name: "test", + @s3_bucket = mock(Aws::S3::Bucket.new(name: "test_bucket", client: @s3_client)) @s3_bucket.exists? { exists_return } @s3_object = mock(Aws::S3::Object.new(bucket_name: "test_bucket",