Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): Add Open Telemetry trace support #16373

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion google-cloud-pubsub/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Metrics/ClassLength:
Metrics/CyclomaticComplexity:
Max: 15
Metrics/MethodLength:
Max: 35
Max: 40
Metrics/PerceivedComplexity:
Max: 15
Naming/FileName:
Expand Down
1 change: 1 addition & 0 deletions google-cloud-pubsub/google-cloud-pubsub.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Gem::Specification.new do |gem|
gem.add_dependency "concurrent-ruby", "~> 1.1"
gem.add_dependency "google-cloud-core", "~> 1.5"
gem.add_dependency "google-cloud-pubsub-v1", "~> 0.0"
gem.add_dependency "opentelemetry-sdk", "~> 1.0"
quartzmo marked this conversation as resolved.
Show resolved Hide resolved

gem.add_development_dependency "autotest-suffix", "~> 1.1"
gem.add_development_dependency "avro", "~> 1.10"
Expand Down
97 changes: 68 additions & 29 deletions google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
require "google/cloud/pubsub/publish_result"
require "google/cloud/pubsub/service"
require "google/cloud/pubsub/convert"
require "opentelemetry"

module Google
module Cloud
Expand Down Expand Up @@ -105,6 +106,7 @@ def initialize topic_name,
@cond = new_cond
@flow_controller = FlowController.new(**@flow_control)
@thread = Thread.new { run_background }
@tracer = OpenTelemetry.tracer_provider.tracer "Google::Cloud::PubSub", Google::Cloud::PubSub::VERSION
end

##
Expand Down Expand Up @@ -133,30 +135,41 @@ def initialize topic_name,
#
def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback
msg = Convert.pubsub_message data, attributes, ordering_key, extra_attrs
begin
@flow_controller.acquire msg.to_proto.bytesize
rescue FlowControlLimitError => e
stop_publish ordering_key, e if ordering_key
raise
end
span_attrs = Convert.span_attributes topic_name, msg
span = @tracer.start_span "#{topic_name} send",
attributes: span_attrs,
kind: OpenTelemetry::Trace::SpanKind::PRODUCER

# TODO: message size in this span will be incorrect after propagation, below.
@tracer.in_span "#{topic_name} add to batch",
attributes: span_attrs,
kind: OpenTelemetry::Trace::SpanKind::PRODUCER do
propagate_span_in_message span, msg
begin
@flow_controller.acquire msg.to_proto.bytesize
rescue FlowControlLimitError => e
stop_publish ordering_key, e if ordering_key
raise
end

synchronize do
raise AsyncPublisherStopped if @stopped
raise OrderedMessagesDisabled if !@ordered && !msg.ordering_key.empty? # default is empty string
synchronize do
raise AsyncPublisherStopped if @stopped
raise OrderedMessagesDisabled if !@ordered && !msg.ordering_key.empty? # default is empty string

batch = resolve_batch_for_message msg
if batch.canceled?
@flow_controller.release msg.to_proto.bytesize
raise OrderingKeyError, batch.ordering_key
end
batch_action = batch.add msg, callback
if batch_action == :full
publish_batches!
elsif @published_at.nil?
# Set initial time to now to start the background counter
@published_at = Time.now
batch = resolve_batch_for_message msg
if batch.canceled?
@flow_controller.release msg.to_proto.bytesize
raise OrderingKeyError, batch.ordering_key
end
batch_action = batch.add msg, callback, span
if batch_action == :full
publish_batches!
elsif @published_at.nil?
# Set initial time to now to start the background counter
@published_at = Time.now
end
@cond.signal
end
@cond.signal
end

nil
Expand Down Expand Up @@ -296,6 +309,24 @@ def resume_publish ordering_key

protected

# Open Telemetry type used by TraceContext#inject to write "traceparent" context into the message attributes.
class TextMapSetter
# Writes key into a message protobuf.
def set msg, key, value
msg.attributes[key] = value
end
end

def propagate_span_in_message span, msg
return unless span.context.valid?
# Add span context to pubsub message attributes.
propagator = OpenTelemetry::Trace::Propagation::TraceContext.text_map_propagator
propagator.inject msg, setter: TextMapSetter.new

# Update the message size in the span attributes after adding span context to message attributes.
span.set_attribute "messaging.message_payload_size_bytes", msg.to_proto.bytesize
end

def run_background
synchronize do
until @stopped
Expand Down Expand Up @@ -355,9 +386,9 @@ def publish_batch_async topic_name, batch
# TODO: raise unless @publish_thread_pool.running?
return unless @publish_thread_pool.running?

Concurrent::Promises.future_on(
@publish_thread_pool, topic_name, batch
) { |t, b| publish_batch_sync t, b }
Concurrent::Promises.future_on @publish_thread_pool, topic_name, batch do |t, b|
publish_batch_sync t, b
end
end

# rubocop:disable Metrics/AbcSize
Expand All @@ -370,14 +401,22 @@ def publish_batch_sync topic_name, batch
items = batch.rebalance!

unless items.empty?
spans = items.map do |item|
span_attrs = Convert.span_attributes topic_name, item.msg
@tracer.start_span "#{topic_name} publish RPC",
attributes: span_attrs,
kind: OpenTelemetry::Trace::SpanKind::PRODUCER
end
grpc = @service.publish topic_name, items.map(&:msg)
spans.map(&:finish) # TODO: Move to ensure block?
items.zip Array(grpc.message_ids) do |item, id|
@flow_controller.release item.bytesize
next unless item.callback

item.msg.message_id = id
publish_result = PublishResult.from_grpc item.msg
execute_callback_async item.callback, publish_result
if item.callback
item.msg.message_id = id
publish_result = PublishResult.from_grpc item.msg
execute_callback_async item.callback, publish_result
end
item.span.finish # TODO: Move to ensure block?
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ def initialize publisher, ordering_key
# * `:full` - Batch is full and ready to be published, and the
# message is queued.
#
def add msg, callback
def add msg, callback, span
synchronize do
raise AsyncPublisherStopped if @stopping
raise OrderingKeyError, @ordering_key if @canceled

if @publishing
queue_add msg, callback
queue_add msg, callback, span
:queued
elsif try_add msg, callback
elsif try_add msg, callback, span
:added
else
queue_add msg, callback
queue_add msg, callback, span
:full
end
end
Expand Down Expand Up @@ -141,7 +141,7 @@ def rebalance!

until @queue.empty?
item = @queue.first
if try_add item.msg, item.callback
if try_add item.msg, item.callback, item.span
@queue.shift
next
end
Expand Down Expand Up @@ -180,7 +180,7 @@ def reset!

until @queue.empty?
item = @queue.first
added = try_add item.msg, item.callback
added = try_add item.msg, item.callback, item.span
break unless added
@queue.shift
end
Expand Down Expand Up @@ -257,16 +257,16 @@ def empty?

protected

def items_add msg, callback
item = Item.new msg, callback
def items_add msg, callback, span
item = Item.new msg, callback, span
@items << item
@total_message_bytes += item.bytesize + 2
end

def try_add msg, callback
def try_add msg, callback, span
if @items.empty?
# Always add when empty, even if bytesize is bigger than total
items_add msg, callback
items_add msg, callback, span
return true
end
new_message_count = total_message_count + 1
Expand All @@ -275,12 +275,12 @@ def try_add msg, callback
new_message_bytes >= @publisher.max_bytes
return false
end
items_add msg, callback
items_add msg, callback, span
true
end

def queue_add msg, callback
item = Item.new msg, callback
def queue_add msg, callback, span
item = Item.new msg, callback, span
@queue << item
end

Expand All @@ -292,7 +292,7 @@ def total_message_bytes
@total_message_bytes
end

Item = Struct.new :msg, :callback do
Item = Struct.new :msg, :callback, :span do
def bytesize
msg.to_proto.bytesize
end
Expand Down
11 changes: 11 additions & 0 deletions google-cloud-pubsub/lib/google/cloud/pubsub/convert.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ def pubsub_message data, attributes, ordering_key, extra_attrs
ordering_key: ordering_key
)
end

def span_attributes topic_name, msg
{
"messaging.system" => "pubsub",
"messaging.destination" => topic_name,
"messaging.destination_kind" => "topic",
"messaging.message_id" => msg.message_id,
"messaging.message_payload_size_bytes" => msg.to_proto.bytesize,
"pubsub.ordering_key" => msg.ordering_key
}
end
end

extend ClassMethods
Expand Down
Loading