Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added batch span processor with test coverage #195

Merged
merged 9 commits into from
Aug 4, 2020

Conversation

snehilchopra
Copy link
Contributor

@snehilchopra snehilchopra commented Jul 21, 2020

This PR contains the implementation of a Batch Span Processor, as mentioned in #83.
I used the implementations in python, java and golang as references.

However, there are a lot of discrepancies in their implementations. For instance, Golang does not have a forceflush method and
Python has concurrency issues. On the other hand, Java is well synchronized, such that whenever the queue is being manipulated there are always locks guarding it.

I have added comments in the code for better context of the problems I observed.

Please let me know your thoughts on them, thanks!

@snehilchopra snehilchopra requested a review from a team July 21, 2020 03:12
@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Jul 21, 2020

CLA Check

@snehilchopra snehilchopra force-pushed the batch_span_processor branch from 8ffce98 to 427fff6 Compare July 21, 2020 05:08
sdk/include/opentelemetry/sdk/trace/batch_span_processor.h Outdated Show resolved Hide resolved
sdk/src/trace/batch_span_processor.cc Outdated Show resolved Hide resolved
sdk/src/trace/batch_span_processor.cc Show resolved Hide resolved
sdk/src/trace/batch_span_processor.cc Outdated Show resolved Hide resolved
sdk/src/trace/batch_span_processor.cc Outdated Show resolved Hide resolved
sdk/src/trace/batch_span_processor.cc Outdated Show resolved Hide resolved
sdk/test/trace/batch_span_processor_test.cc Outdated Show resolved Hide resolved
@codecov
Copy link

codecov bot commented Jul 22, 2020

Codecov Report

Merging #195 into master will increase coverage by 2.03%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #195      +/-   ##
==========================================
+ Coverage   92.19%   94.22%   +2.03%     
==========================================
  Files         117      115       -2     
  Lines        4038     3882     -156     
==========================================
- Hits         3723     3658      -65     
+ Misses        315      224      -91     
Impacted Files Coverage Δ
...clude/opentelemetry/sdk/common/atomic_unique_ptr.h 100.00% <ø> (ø)
...include/opentelemetry/sdk/common/circular_buffer.h 100.00% <ø> (ø)
...e/opentelemetry/sdk/common/circular_buffer_range.h 100.00% <ø> (ø)
sdk/test/common/atomic_unique_ptr_test.cc 100.00% <ø> (ø)
sdk/test/common/circular_buffer_range_test.cc 100.00% <ø> (ø)
sdk/test/common/circular_buffer_test.cc 100.00% <ø> (ø)
sdk/include/opentelemetry/sdk/trace/span_data.h 98.33% <100.00%> (ø)
sdk/src/metrics/meter_provider.cc 100.00% <0.00%> (ø)
api/test/metrics/noop_metrics_test.cc
api/include/opentelemetry/metrics/meter.h
... and 2 more

@snehilchopra snehilchopra force-pushed the batch_span_processor branch 2 times, most recently from fe416e4 to bc5f67f Compare July 22, 2020 17:57
@snehilchopra
Copy link
Contributor Author

snehilchopra commented Jul 22, 2020

/check-cla

@snehilchopra snehilchopra force-pushed the batch_span_processor branch 2 times, most recently from 84e9684 to 696939c Compare July 23, 2020 01:35
@snehilchopra snehilchopra force-pushed the batch_span_processor branch from 696939c to a8ac7a1 Compare July 23, 2020 02:12
sdk/src/trace/batch_span_processor.cc Show resolved Hide resolved
sdk/include/opentelemetry/sdk/trace/batch_span_processor.h Outdated Show resolved Hide resolved
sdk/src/trace/batch_span_processor.cc Outdated Show resolved Hide resolved

// If the queue gets at least half full a preemptive notification is
// sent to the worker thread to start a new export cycle.
if(static_cast<int>(buffer_->size()) >= max_queue_size_ / 2){

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for notification to be missed here? Woudl it make sense to make this a loop like in other places where a notification is sent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this wasn't stated anywhere explicitly in the spec, I was wondering if it was fine to leave this as a hit-or-miss sort of a notify call. All it does is try to make a preemptive notification for better performance and is not a hard requirement.

But I think when there are many producer threads, this sort of preemption might become necessary to minimize the number of spans dropped.

Please let me know what you think about this!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reyang @pyohannes
It'd be great to get your feedback too on this!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if it is not a hard requirement, I think it is fine to leave it as a single attempt to wake up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be a good idea to me.

sdk/src/trace/batch_span_processor.cc Outdated Show resolved Hide resolved
sdk/src/trace/batch_span_processor.cc Show resolved Hide resolved
sdk/src/trace/batch_span_processor.cc Outdated Show resolved Hide resolved
sdk/test/trace/batch_span_processor_test.cc Outdated Show resolved Hide resolved
@snehilchopra snehilchopra force-pushed the batch_span_processor branch from 7addf7e to eab7d98 Compare July 23, 2020 20:23
Copy link

@IlyaKobelevskiy IlyaKobelevskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think overall this looks good!

@snehilchopra snehilchopra force-pushed the batch_span_processor branch from eab7d98 to 1f0bc9c Compare July 23, 2020 22:22
@ZiweiZhao
Copy link

LGTM!

/**
* Class destructor which invokes the Shutdown() method. The Shutdown() method is supposed to be invoked
* when the Tracer is shutdown (as per other languages), but the C++ Tracer only takes shared ownership of the processor,
* and thus doesn't call Shutdown (as the processor might be shared with other Tracers).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @cijothomas @rajkumar-rangaraj

I think @snehilchopra is the first one doing this with the correct consideration of ref counting.

@snehilchopra snehilchopra force-pushed the batch_span_processor branch from 500c646 to 381d435 Compare July 26, 2020 05:28
@snehilchopra snehilchopra force-pushed the batch_span_processor branch 9 times, most recently from 9efe9c5 to fdba096 Compare July 31, 2020 23:34
@snehilchopra
Copy link
Contributor Author

@pyohannes Can I please get a final stamp on this?

@snehilchopra snehilchopra force-pushed the batch_span_processor branch from fdba096 to 9bf95d8 Compare August 1, 2020 01:03
Copy link
Contributor

@pyohannes pyohannes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some more remarks. In addition to those, I'd like to propose a simplification for the Shutdown and DoBackgroundWork method:

    void BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
    {
      is_shutdown_ = true;

      cv_.notify_one();
      worker_thread_.join();

      exporter_->Shutdown();
    }

    void BatchSpanProcessor::DoBackgroundWork()
    {
      while (true)
      {
        std::unique_lock<std::mutex> lk(cv_m_);

        cv_.wait_for(lk, schedule_delay_millis_);

        if (is_shutdown_.load() == true)
        {
          DrainQueue();
          return;
        } 

        bool was_force_flush_called = is_force_flush_.load();

        if (was_force_flush_called)
        {
          is_force_flush_ = false;
        } else {
            if (buffer_.empty()) {
                continue;
            }   
        }   

        Export(was_force_flush_called);
      }
    }

This passes all the tests. The only drawback would be that the Shutdown call can take a bit longer, but it simplifies the solution quite a bit and I think it makes it actually easier to implement a stable timeout mechanism on top of that (maybe in another PR).

sdk/include/opentelemetry/sdk/trace/batch_span_processor.h Outdated Show resolved Hide resolved
sdk/test/trace/BUILD Show resolved Hide resolved
std::mutex cv_m_, force_flush_cv_m_;

/* The buffer/queue to which the ended spans are added */
std::unique_ptr<common::CircularBuffer<Recordable>> buffer_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to be a pointer. If the batch span processor is allocated on the heap, the buffer will too.

sdk/src/trace/batch_span_processor.cc Outdated Show resolved Hide resolved
sdk/src/trace/batch_span_processor.cc Outdated Show resolved Hide resolved
{
// If we already have max_export_batch_size_ spans in the buffer, better to export them
// now
if (buffer_->size() < max_export_batch_size_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to understand the effect of this statement.

If lots of spans are constantly added to the buffer (and/or if max_export_batch_size_ is very small) this statement might always evaluate to false, and thus we'll never reach the line with the cv_.wait_for. So we won't honor the schedule_delay_millis_ under certain critical circumstances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can change it. It was just an effort to enhance performance, and I did this using Java's processor implementation as a reference.

sdk/src/trace/batch_span_processor.cc Outdated Show resolved Hide resolved
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

timeout = schedule_delay_millis_ - duration;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get rid of this calculation and just go with waiting for the whole schedule_delay_millis_ in each iteration. This would be more in line with the spec, which says:

the delay interval in milliseconds between two consecutive exports

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that it meant the delay interval between when two exports essentially began.
I believe python has the same logic too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I had a look at Java, which doesn't have it.

I was drawn to the simpler solution, but I leave this up to you to decide.

@snehilchopra
Copy link
Contributor Author

I have some more remarks. In addition to those, I'd like to propose a simplification for the Shutdown and DoBackgroundWork method:

    void BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
    {
      is_shutdown_ = true;

      cv_.notify_one();
      worker_thread_.join();

      exporter_->Shutdown();
    }

    void BatchSpanProcessor::DoBackgroundWork()
    {
      while (true)
      {
        std::unique_lock<std::mutex> lk(cv_m_);

        cv_.wait_for(lk, schedule_delay_millis_);

        if (is_shutdown_.load() == true)
        {
          DrainQueue();
          return;
        } 

        bool was_force_flush_called = is_force_flush_.load();

        if (was_force_flush_called)
        {
          is_force_flush_ = false;
        } else {
            if (buffer_.empty()) {
                continue;
            }   
        }   

        Export(was_force_flush_called);
      }
    }

This passes all the tests. The only drawback would be that the Shutdown call can take a bit longer, but it simplifies the solution quite a bit and I think it makes it actually easier to implement a stable timeout mechanism on top of that (maybe in another PR).

Thank you so much for the simplification Johannes!

Around 2 weeks ago my initial implementation was along the same lines, just that we were encountering the case where the condition variable notify calls were being missed by the worker thread, causing the Shutdown() test to take schedule_delay_millis milliseconds to complete.

In order to avoid this, we wrapped notify calls in while loops with boolean flags as checks on whether the notify call had been received or not.

If it's permissible that the Shutdown() method's notify calls can miss, I can make the change right away.

@pyohannes
Copy link
Contributor

If it's permissible that the Shutdown() method's notify calls can miss, I can make the change right away.

I'm ok with that. I think invocations of Shutdown are not performance critical. But I'm open to any other opinions on this.

@snehilchopra
Copy link
Contributor Author

I'm ok with that. I think invocations of Shutdown are not performance critical. But I'm open to any other opinions on this.

Sure. I'll change it for implementation simplicity.

Just that there are similar issues in python's implementation, where a lot of notify calls miss.
So, we tried to address those concerns in the C++ implementation as much as possible.

@snehilchopra snehilchopra force-pushed the batch_span_processor branch from 47a9fb1 to 61516a3 Compare August 2, 2020 00:28
@snehilchopra snehilchopra force-pushed the batch_span_processor branch from 0b49229 to 6d5eb05 Compare August 4, 2020 00:30
@reyang reyang merged commit f0789be into open-telemetry:master Aug 4, 2020
@snehilchopra snehilchopra deleted the batch_span_processor branch August 5, 2020 17:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr:please-merge This PR is ready to be merged by a Maintainer (rebased, CI passed, has enough valid approvals, etc.)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants