Skip to content

Commit

Permalink
Add compress_batches feature
Browse files Browse the repository at this point in the history
As per the README updates, this can be used to compress a number of
input records into a single Pub/Sub message, therefore saving on costs.
  • Loading branch information
benwh committed May 17, 2020
1 parent 6146724 commit e0c92bd
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 24 deletions.
74 changes: 72 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Use `gcloud_pubsub` output plugin.
max_messages 1000
max_total_size 9800000
max_message_size 4000000
compress_batches false
<buffer>
@type memory
flush_interval 1s
Expand Down Expand Up @@ -92,7 +93,11 @@ Use `gcloud_pubsub` output plugin.
- `max_message_size` (optional, default: `4000000` = `4MB`)
- Messages exceeding `max_message_size` are not published because Pub/Sub clients cannot receive it.
- `attribute_keys` (optional, default: `[]`)
- Publishing the set fields as attributes.
- Extract these fields from the record and send them as attributes on the Pub/Sub message. Cannot be set if compress_batches is enabled.
- `metric_prefix` (optional, default: `fluentd_output_gcloud_pubsub`)
- The prefix for Prometheus metric names
- `compress_batches` (optional, default: `false`)
- If set to `true`, messages will be batched and compressed before publication. See [message compression](#message-compression) for details.

### Pull messages

Expand Down Expand Up @@ -147,18 +152,83 @@ Use `gcloud_pubsub` input plugin.
- `pull_threads` (optional, default: `1`)
- Set number of threads to pull messages.
- `attribute_keys` (optional, default: `[]`)
- Specify the key of the attribute to be emitted as the field of record.
- Acquire these fields from attributes on the Pub/Sub message and merge them into the record.
- `parse_error_action` (optional, default: `exception`)
- Set error type when parsing messages fails.
- `exception`: Raise exception. Messages are not acknowledged.
- `warning`: Only logging as warning.
- `metric_prefix` (optional, default: `fluentd_input_gcloud_pubsub`)
- The prefix for Prometheus metric names
- `enable_rpc` (optional, default: `false`)
- If `true` is specified, HTTP RPC to stop or start pulling message is enabled.
- `rpc_bind` (optional, default: `0.0.0.0`)
- Bind IP address for HTTP RPC.
- `rpc_port` (optional, default: `24680`)
- Port for HTTP RPC.

## Message compression

The `compress_batches` option can be used to enable the compression of messages
_before_ publication to Pub/Sub.

This works by collecting the buffered messages, taking up to `max_total_size` or
`max_message_size` input records, then compressing them with Zlib (i.e.
gzip/Deflate) before publishing them as a single message to the Pub/Sub topic.

When transporting large volumes of records via Pub/Sub, e.g. multiple Terabytes
per month, this can lead to significant cost savings, as typically the CPU time
required to compress the messages will be minimal in comparison to the Pub/Sub
costs.

The compression ratio achievable will vary largely depending on the homogeneity
of the input records, but typically will be 50% at the very minimum and often
around 80-90%.

In order to achieve good compression, consider the following:
- Ensure that the buffer is being filled with a reasonable batch of messages: do
not use `flush_mode immediate`, and keep the `flush_interval` value
sufficiently high. Use the Prometheus metrics to determine how many records
are being published per message.
- Keep the `max_messages` and `max_message_size` values high (the defaults are
optimal).
- If there are many different sources of messages being mixed and routed to a
single `gcloud_pubsub` output, use multiple outputs (which will each have
their own buffer) through tagging or [labelling][fluentd-labels].

[fluentd-labels]: https://docs.fluentd.org/quickstart/life-of-a-fluentd-event#labels

The receiving end must be able to decode these compressed batches of messages,
which it can determine via an attribute set on the Pub/Sub message. The
`gcloud_pubsub` input plugin will do this transparently, decompressing any
messages which contain a batch of records and normally processing any messages
which represent just a single record.
Therefore, as long as all of the receivers are updated with support for
compressed batches first, it's then possible to gradually roll out this feature.

## Prometheus metrics

The input and output plugins expose several metrics in order to monitor
performance:

- `fluentd_output_gcloud_pubsub_compression_enabled`
- Gauge: Whether compression/batching is enabled
- `fluentd_output_gcloud_pubsub_messages_published_per_batch`
- Histogram: Number of records published to Pub/Sub per buffer flush
- `fluentd_output_gcloud_pubsub_messages_published_bytes`
- Histogram: Total size in bytes of the records published to Pub/Sub
- `fluentd_output_gcloud_pubsub_messages_compression_duration_seconds`
- Histogram: Time taken to compress a batch of messages
- `fluentd_output_gcloud_pubsub_messages_compression_ratio`
- Histogram: Compression ratio achieved on a batch of messages

- `fluentd_input_gcloud_pubsub_pull_errors_total`
- Counter: Errors encountered while pulling or processing messages (split by a
`retryable` label)
- `fluentd_input_gcloud_pubsub_messages_pulled`
- Histogram: Number of Pub/Sub messages pulled by the subscriber on each invocation
- `fluentd_input_gcloud_pubsub_messages_pulled_bytes`
- Histogram: Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation

## Contributing

1. Fork it
Expand Down
2 changes: 2 additions & 0 deletions fluent-plugin-gcloud-pubsub-custom.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "prometheus-client", "< 0.10"

gem.add_development_dependency "bundler"
gem.add_development_dependency "pry"
gem.add_development_dependency "pry-byebug"
gem.add_development_dependency "rake"
gem.add_development_dependency "rubocop", "~>0.83"
gem.add_development_dependency "test-unit"
Expand Down
100 changes: 95 additions & 5 deletions lib/fluent/plugin/gcloud_pubsub/client.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "google/cloud/pubsub"
require "zlib"

module Fluent
module GcloudPubSub
Expand All @@ -9,6 +10,10 @@ class Error < StandardError
class RetryableError < Error
end

COMPRESSION_ALGORITHM_ZLIB = "zlib"
# 30 is the ASCII record separator character
BATCHED_RECORD_SEPARATOR = 30.chr

class Message
attr_reader :message, :attributes

Expand All @@ -27,10 +32,35 @@ def bytesize
end

class Publisher
def initialize(project, key, autocreate_topic)
def initialize(project, key, autocreate_topic, metric_prefix)
@pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key
@autocreate_topic = autocreate_topic
@topics = {}

@compression_ratio =
Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{metric_prefix}_messages_compression_ratio") do
::Prometheus::Client.registry.histogram(
:"#{metric_prefix}_messages_compression_ratio",
"Compression ratio achieved on a batch of messages",
{},
# We expect compression for even a single message to be typically
# above 2x, so bias the buckets towards the higher end of the
# range.
[0, 0.25, 0.5, 0.75, 0.85, 0.9, 0.95, 0.975, 1],
)
end

# rubocop:disable Layout/LineLength
@compression_duration =
Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{metric_prefix}_messages_compression_duration_seconds") do
::Prometheus::Client.registry.histogram(
:"#{metric_prefix}_messages_compression_duration_seconds",
"Time taken to compress a batch of messages",
{},
[0, 0.0001, 0.0005, 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1],
)
end
# rubocop:enable Layout/LineLength
end

def topic(topic_name)
Expand All @@ -44,15 +74,54 @@ def topic(topic_name)
client
end

def publish(topic_name, messages)
topic(topic_name).publish do |batch|
messages.each do |m|
batch.publish m.message, m.attributes
def publish(topic_name, messages, compress_batches = false)
if compress_batches
topic(topic_name).publish(*compress_messages_with_zlib(messages, topic_name))
else
topic(topic_name).publish do |batch|
messages.each do |m|
batch.publish m.message, m.attributes
end
end
end
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => e
raise RetryableError, "Google api returns error:#{e.class} message:#{e}"
end

private

def compress_messages_with_zlib(messages, topic_name)
original_size = messages.sum(&:bytesize)
# This should never happen, only a programming error or major
# misconfiguration should lead to this situation. But checking against
# it here avoids a potential division by zero later on.
raise ArgumentError, "not compressing empty inputs" if original_size.zero?

# Here we're implicitly dropping the 'attributes' field of the messages
# that we're iterating over.
# This is fine, because the :attribute_keys config param is not
# supported when in compressed mode, so this field will always be
# empty.
packed_messages = messages.map(&:message).join(BATCHED_RECORD_SEPARATOR)

duration, compressed_messages = Fluent::GcloudPubSub::Metrics.measure_duration do
Zlib::Deflate.deflate(packed_messages)
end

@compression_duration.observe(
{ topic: topic_name, algorithm: COMPRESSION_ALGORITHM_ZLIB },
duration,
)

compressed_size = compressed_messages.bytesize
@compression_ratio.observe(
{ topic: topic_name, algorithm: COMPRESSION_ALGORITHM_ZLIB },
# If original = 1MiB and compressed = 256KiB; then metric value = 0.75 = 75% when plotted
1 - compressed_size.to_f / original_size,
)

[compressed_messages, { "compression_algorithm": COMPRESSION_ALGORITHM_ZLIB }]
end
end

class Subscriber
Expand All @@ -79,5 +148,26 @@ def acknowledge(messages)
raise RetryableError, "Google acknowledge api returns error:#{e.class} message:#{e}"
end
end

class MessageUnpacker
def self.unpack(message)
attributes = message.attributes
algorithm = attributes["compression_algorithm"]

case algorithm
when nil
# For an uncompressed message return the single line and attributes
[[message.message.data.chomp, message.attributes]]
when COMPRESSION_ALGORITHM_ZLIB
# Return all of the lines in the message, with empty attributes
Zlib::Inflate
.inflate(message.message.data)
.split(BATCHED_RECORD_SEPARATOR)
.map { |line| [line, {}] }
else
raise ArgumentError, "unknown compression algorithm: '#{algorithm}'"
end
end
end
end
end
10 changes: 10 additions & 0 deletions lib/fluent/plugin/gcloud_pubsub/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ def self.register_or_existing(metric_name)

yield
end

# Time the elapsed execution of the provided block, return the duration
# as the first element followed by the result of the block.
def self.measure_duration
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
result = yield
finish = Process.clock_gettime(Process::CLOCK_MONOTONIC)

[finish - start, *result]
end
end
end
end
30 changes: 16 additions & 14 deletions lib/fluent/plugin/in_gcloud_pubsub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class FailedParseError < StandardError
config_param :return_immediately, :bool, default: true
desc "Set number of threads to pull messages."
config_param :pull_threads, :integer, default: 1
desc "Specify the key of the attribute to be acquired as a record"
desc "Acquire these fields from attributes on the Pub/Sub message and merge them into the record"
config_param :attribute_keys, :array, default: []
desc "Set error type when parsing messages fails."
config_param :parse_error_action, :enum, default: :exception, list: %i[exception warning]
Expand Down Expand Up @@ -263,21 +263,23 @@ def process(messages)
end

messages.each do |m|
line = m.message.data.chomp
attributes = m.attributes
@parser.parse(line) do |time, record|
if time && record
@attribute_keys.each do |key|
record[key] = attributes[key]
end
lines_attributes = Fluent::GcloudPubSub::MessageUnpacker.unpack(m)

lines_attributes.each do |line, attributes|
@parser.parse(line) do |time, record|
if time && record
@attribute_keys.each do |key|
record[key] = attributes[key]
end

event_streams[@extract_tag.call(record)].add(time, record)
else
case @parse_error_action
when :exception
raise FailedParseError, "pattern not match: #{line}"
event_streams[@extract_tag.call(record)].add(time, record)
else
log.warn "pattern not match", record: line
case @parse_error_action
when :exception
raise FailedParseError, "pattern not match: #{line}"
else
log.warn "pattern not match", record: line
end
end
end
end
Expand Down
28 changes: 25 additions & 3 deletions lib/fluent/plugin/out_gcloud_pubsub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ class GcloudPubSubOutput < Output
config_param :max_total_size, :integer, default: 9_800_000 # 9.8MB
desc "Limit bytesize per message."
config_param :max_message_size, :integer, default: 4_000_000 # 4MB
desc "Publishing the set field as an attribute"
desc "Extract these fields from the record and send them as attributes on the Pub/Sub message. " \
"Cannot be set if compress_batches is enabled."
config_param :attribute_keys, :array, default: []
desc "The prefix for Prometheus metric names"
config_param :metric_prefix, :string, default: "fluentd_output_gcloud_pubsub"
desc "If set to `true`, messages will be batched and compressed before publication"
config_param :compress_batches, :bool, default: false

config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
Expand All @@ -51,6 +54,15 @@ def configure(conf)
placeholder_validate!(:topic, @topic)
@formatter = formatter_create

if @compress_batches && !@attribute_keys.empty?
# The attribute_keys option is implemented by extracting keys from the
# record and setting them on the Pub/Sub message.
# This is not possible in compressed mode, because we're sending just a
# single Pub/Sub message that comprises many records, therefore the
# attribute keys would clash.
raise Fluent::ConfigError, ":attribute_keys cannot be used when compression is enabled"
end

@messages_published =
Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_per_batch") do
::Prometheus::Client.registry.histogram(
Expand All @@ -70,12 +82,22 @@ def configure(conf)
[100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000],
)
end

@compression_enabled =
Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_compression_enabled") do
::Prometheus::Client.registry.gauge(
:"#{@metric_prefix}_compression_enabled",
"Whether compression/batching is enabled",
{},
)
end
@compression_enabled.set(common_labels, @compress_batches ? 1 : 0)
end
# rubocop:enable Metrics/MethodLength

def start
super
@publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic
@publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @metric_prefix
end

def format(tag, time, record)
Expand Down Expand Up @@ -135,7 +157,7 @@ def publish(topic, messages)
@messages_published.observe(common_labels, messages.length)
@bytes_published.observe(common_labels, size)

@publisher.publish(topic, messages)
@publisher.publish(topic, messages, @compress_batches)
end

def common_labels
Expand Down
Loading

0 comments on commit e0c92bd

Please sign in to comment.