Skip to content

Commit

Permalink
Add Prometheus metrics
Browse files Browse the repository at this point in the history
Augment the existing operations with Prometheus metrics in order to
provide observability around the operations that the plugin is
performing.

Introduce a new metrics helper to prevent attempting to register the
same metric more than once in a multi-threaded or multi-instance
context.
  • Loading branch information
benwh committed May 19, 2020
1 parent ad03786 commit 265e819
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 1 deletion.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ Use `gcloud_pubsub` output plugin.
- Messages exceeding `max_message_size` are not published because Pub/Sub clients cannot receive it.
- `attribute_keys` (optional, default: `[]`)
- Publishing the set fields as attributes.
- `metric_prefix` (optional, default: `fluentd_output_gcloud_pubsub`)
- The prefix for Prometheus metric names

### Pull messages

Expand Down Expand Up @@ -152,13 +154,33 @@ Use `gcloud_pubsub` input plugin.
- Set error type when parsing messages fails.
- `exception`: Raise exception. Messages are not acknowledged.
- `warning`: Only logging as warning.
- `metric_prefix` (optional, default: `fluentd_input_gcloud_pubsub`)
- The prefix for Prometheus metric names
- `enable_rpc` (optional, default: `false`)
- If `true` is specified, HTTP RPC to stop or start pulling message is enabled.
- `rpc_bind` (optional, default: `0.0.0.0`)
- Bind IP address for HTTP RPC.
- `rpc_port` (optional, default: `24680`)
- Port for HTTP RPC.

## Prometheus metrics

The input and output plugins expose several metrics in order to monitor
performance:

- `fluentd_output_gcloud_pubsub_messages_published_per_batch`
- Histogram: Number of records published to Pub/Sub per buffer flush
- `fluentd_output_gcloud_pubsub_messages_published_bytes`
- Histogram: Total size in bytes of the records published to Pub/Sub

- `fluentd_input_gcloud_pubsub_pull_errors_total`
- Counter: Errors encountered while pulling or processing messages (split by a
`retryable` label)
- `fluentd_input_gcloud_pubsub_messages_pulled`
- Histogram: Number of Pub/Sub messages pulled by the subscriber on each invocation
- `fluentd_input_gcloud_pubsub_messages_pulled_bytes`
- Histogram: Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation

## Contributing

1. Fork it
Expand Down
3 changes: 3 additions & 0 deletions fluent-plugin-gcloud-pubsub-custom.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "fluentd", [">= 0.14.15", "< 2"]
gem.add_runtime_dependency "google-cloud-pubsub", "~> 0.30.0"

# Use the same version constraint as fluent-plugin-prometheus currently specifies
gem.add_runtime_dependency "prometheus-client", "< 0.10"

gem.add_development_dependency "bundler"
gem.add_development_dependency "rake"
gem.add_development_dependency "rubocop", "~>0.83"
Expand Down
14 changes: 14 additions & 0 deletions lib/fluent/plugin/gcloud_pubsub/metrics.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

module Fluent
module GcloudPubSub
# Utilities for interacting with Prometheus metrics
module Metrics
def self.register_or_existing(metric_name)
return ::Prometheus::Client.registry.get(metric_name) if ::Prometheus::Client.registry.exist?(metric_name)

yield
end
end
end
end
46 changes: 46 additions & 0 deletions lib/fluent/plugin/in_gcloud_pubsub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require "fluent/plugin/parser"

require "fluent/plugin/gcloud_pubsub/client"
require "fluent/plugin/gcloud_pubsub/metrics"

module Fluent::Plugin
class GcloudPubSubInput < Input
Expand Down Expand Up @@ -43,6 +44,8 @@ class FailedParseError < StandardError
config_param :attribute_keys, :array, default: []
desc "Set error type when parsing messages fails."
config_param :parse_error_action, :enum, default: :exception, list: %i[exception warning]
desc "The prefix for Prometheus metric names"
config_param :metric_prefix, :string, default: "fluentd_input_gcloud_pubsub"
# for HTTP RPC
desc "If `true` is specified, HTTP RPC to stop or start pulling message is enabled."
config_param :enable_rpc, :bool, default: false
Expand Down Expand Up @@ -104,6 +107,7 @@ def process(req, _res)
end
end

# rubocop:disable Metrics/MethodLength
def configure(conf)
compat_parameters_convert(conf, :parser)
super
Expand All @@ -118,7 +122,37 @@ def configure(conf)
end

@parser = parser_create

@messages_pulled =
Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_pulled") do
::Prometheus::Client.registry.histogram(
:"#{@metric_prefix}_messages_pulled",
"Number of Pub/Sub messages pulled by the subscriber on each invocation",
{},
[0, 1, 10, 50, 100, 250, 500, 1000],
)
end

@messages_pulled_bytes =
Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_pulled_bytes") do
::Prometheus::Client.registry.histogram(
:"#{@metric_prefix}_messages_pulled_bytes",
"Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation",
{},
[100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000],
)
end

@pull_errors =
Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_pull_errors_total") do
::Prometheus::Client.registry.counter(
:"#{@metric_prefix}_pull_errors_total",
"Errors encountered while pulling or processing messages",
{},
)
end
end
# rubocop:enable Metrics/MethodLength

def start
super
Expand Down Expand Up @@ -199,18 +233,26 @@ def subscribe

def _subscribe
messages = @subscriber.pull @return_immediately, @max_messages
@messages_pulled.observe(common_labels, messages.size)
if messages.empty?
log.debug "no messages are pulled"
return
end

messages_size = messages.sum do |message|
message.data.bytesize + message.attributes.sum { |k, v| k.bytesize + v.bytesize }
end
@messages_pulled_bytes.observe(common_labels, messages_size)

process messages
@subscriber.acknowledge messages

log.debug "#{messages.length} message(s) processed"
rescue Fluent::GcloudPubSub::RetryableError => e
@pull_errors.increment(common_labels.merge({ retryable: true }))
log.warn "Retryable error occurs. Fluentd will retry.", error_message: e.to_s, error_class: e.class.to_s
rescue StandardError => e
@pull_errors.increment(common_labels.merge({ retryable: false }))
log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s
log.error_backtrace e.backtrace
end
Expand Down Expand Up @@ -249,5 +291,9 @@ def process(messages)
end
end
end

def common_labels
{ subscription: @subscription }
end
end
end
37 changes: 36 additions & 1 deletion lib/fluent/plugin/out_gcloud_pubsub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

require "fluent/plugin/output"
require "fluent/plugin/gcloud_pubsub/client"
require "fluent/plugin/gcloud_pubsub/metrics"
require "fluent/plugin_helper/inject"
require "prometheus/client"

module Fluent::Plugin
class GcloudPubSubOutput < Output
Expand Down Expand Up @@ -31,6 +33,8 @@ class GcloudPubSubOutput < Output
config_param :max_message_size, :integer, default: 4_000_000 # 4MB
desc "Publishing the set field as an attribute"
config_param :attribute_keys, :array, default: []
desc "The prefix for Prometheus metric names"
config_param :metric_prefix, :string, default: "fluentd_output_gcloud_pubsub"

config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
Expand All @@ -40,12 +44,34 @@ class GcloudPubSubOutput < Output
config_set_default :@type, DEFAULT_FORMATTER_TYPE
end

# rubocop:disable Metrics/MethodLength
def configure(conf)
compat_parameters_convert(conf, :buffer, :formatter)
super
placeholder_validate!(:topic, @topic)
@formatter = formatter_create

@messages_published =
Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_per_batch") do
::Prometheus::Client.registry.histogram(
:"#{@metric_prefix}_messages_published_per_batch",
"Number of records published to Pub/Sub per buffer flush",
{},
[1, 10, 50, 100, 250, 500, 1000],
)
end

@bytes_published =
Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_bytes") do
::Prometheus::Client.registry.histogram(
:"#{@metric_prefix}_messages_published_bytes",
"Total size in bytes of the records published to Pub/Sub",
{},
[100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000],
)
end
end
# rubocop:enable Metrics/MethodLength

def start
super
Expand Down Expand Up @@ -103,8 +129,17 @@ def write(chunk)
private

def publish(topic, messages)
log.debug "send message topic:#{topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}"
size = messages.map(&:bytesize).inject(:+)
log.debug "send message topic:#{topic} length:#{messages.length} size:#{size}"

@messages_published.observe(common_labels, messages.length)
@bytes_published.observe(common_labels, size)

@publisher.publish(topic, messages)
end

def common_labels
{ topic: @topic }
end
end
end

0 comments on commit 265e819

Please sign in to comment.