-
Notifications
You must be signed in to change notification settings - Fork 443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added changes for async callback mechanism from Processor to Exporter #1252
Conversation
|
@DebajitDas, Thanks for the PR. |
you may use |
Codecov Report
@@ Coverage Diff @@
## main #1252 +/- ##
==========================================
- Coverage 92.31% 92.24% -0.06%
==========================================
Files 198 202 +4
Lines 7281 7422 +141
==========================================
+ Hits 6721 6846 +125
- Misses 560 576 +16
|
@@ -143,19 +143,19 @@ class Session : public opentelemetry::ext::http::client::Session | |||
} | |||
|
|||
virtual void SendRequest( | |||
opentelemetry::ext::http::client::EventHandler &callback) noexcept override | |||
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> callback) noexcept override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we transfer the callback ownership to http_client using unique_ptr, if that helps avoid shared_ptr during the hot path of request processing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the callback ownership to http_client, would involve storing the unique_ptr to some kind of map (similar to session) and clearing the entry in the map when the async thread completes. But I could not figure out the simplest way to let know http_client that the async thread is completed and map needs to be cleaned. However, cleanup could be achieved by running a separate cleanup thread which cleans up on getting notified by async thread.
Do you have anything else in mind?
The other approach I was thinking was to move the ownership to the lambda function defined in Session::SendRequest, because this lambda function would run in async thread and callback should be valid till lambda function ends. This could have been doable in C++14 as it supports std::move in lambda capture. But I could not think of achieving the same in C++11.
Also, since we have maintained two versions of Export implementation in es_log_exporter.cc, the scope of Export outlives the async thread and in another version the async thread outlives the scope of Export, unique_ptr needs to outlive both of them.
Let me know your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking whether to add a background thread for curl::HttpClient
to deal with resources of async HTTP call before we have a global event loop.
We will also need a thread to call curl_multi_perform
when I start to implement the real async version of curl::Session::SendRequest
.
With a background worker thread. I think we can allocate the unique_ptr on stack in sync Export and move it into the background worker thread in the async Export.
This could have been doable in C++14 as it supports std::move in lambda capture. But I could not think of achieving the same in C++11.
We can use parameter to move unique_ptr in C++11. Here is a example https://godbolt.org/z/T5sG7K9fn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use parameter to move unique_ptr in C++11. Here is a example https://godbolt.org/z/T5sG7K9fn
Thanks @owent for sharing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, since we have maintained two versions of Export implementation in es_log_exporter.cc, the scope of Export outlives the async thread and in another version the async thread outlives the scope of Export, unique_ptr needs to outlive both of them.
@owent But according to above reasoning, I think we can start with shared_ptr for now.
nostd::function_ref<bool(sdk::common::ExportResult)> result_callback) | ||
noexcept | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For exporters not supporting async, we can internally have these methods call sync interface along with logging a warning. Something like:
void ZipkinExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
nostd::function_ref<bool(sdk::common::ExportResult)> result_callback)
noexcept
{
log_warning("async not supported. Making sync interface call");
auto status = Export(spans);
result_callback(status);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as per review comments.
sdk/src/logs/batch_log_processor.cc
Outdated
@@ -151,19 +153,28 @@ void BatchLogProcessor::Export(const bool was_force_flush_called) | |||
return true; | |||
}); | |||
}); | |||
if (is_export_async_ == false || was_force_flush_called == true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think async export should also be called in case of force flush. The callback should do the notify in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed on force flush. What is your thoughts in case of Shutdown?
Since shutdown is called during destructor, do we need use async export during shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since shutdown is called during destructor, do we need use async export during shutdown.
Yes, we should do async export always if the user has selected that option irrespective of shutdown or force-flush. Do you see any issue doing that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we wait util all data to be exported when we call ForceFlush or Shutdown when in async mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed @owent, we should wait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have create #1274 to merge "Cocurrency otlp http session" into async-changes .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lalitb Could you please review #1274 or #1275 and merge any of them into async-changes first. And then we can continue to rebase the other one.
@DebajitDas I think it's fine to merge any of these two PRs first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Determines whether the export happens asynchronously. | ||
* Default implementation is synchronous. | ||
*/ | ||
bool is_export_async = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the async interface also added for simple_span_processor/simple_log_processor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed on this, will make the change.
break; | ||
case http_client::SessionState::NetworkError: | ||
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Network error to elasticsearch"); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing CreateFailed
and Cancelled
? Should we add default: break;
to avoid warnings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will add this today
std::unique_ptr<ResponseHandler> handler(new ResponseHandler(options_.console_debug)); | ||
session->SendRequest(*handler); | ||
auto handler = std::make_shared<ResponseHandler>(options_.console_debug); | ||
session->SendRequest(handler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
// Store the body of the request | ||
body_ = std::string(response.GetBody().begin(), response.GetBody().end()); | ||
if (body_.find("\"failed\" : 0") == std::string::npos) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we check response code instead of searching string to detect failure here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this part of code is actually called when the response is a success ( 200 OK). The original implementor of this exporter knows best what error it is further trying to catch from the response body ( or need to look into elastic search API details). :)
if (body_.find("\"failed\" : 0") == std::string::npos) | ||
{ | ||
OTEL_INTERNAL_LOG_ERROR( | ||
"[ES Trace Exporter] Logs were not written to Elasticsearch correctly, response body: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"[ES Trace Exporter] Logs were not written to Elasticsearch correctly, response body: " | |
"[ES Log Exporter] Logs were not written to Elasticsearch correctly, response body: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will incorporate this change.
switch (state) | ||
{ | ||
case http_client::SessionState::ConnectFailed: | ||
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Connection to elasticsearch failed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Connection to elasticsearch failed"); | |
OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] Connection to Elasticsearch failed"); |
And similar cases on this switch statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will incorporate this change.
nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback_; | ||
|
||
// A string to store the response body | ||
std::string body_ = ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::string body_ = ""; | |
std::string body_; |
Utilize the default ctor which creates empty string as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will make the change.
{ | ||
// Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified | ||
// in URI | ||
body += "{\"index\" : {}}\n"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could al the records share a single {"index":{}}?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part of code was copied from the other Export function. The original implementer would be in better position to answer this.
else | ||
{ | ||
exporter_->Export(nostd::span<std::unique_ptr<Recordable>>(spans_arr.data(), spans_arr.size()), | ||
[this, was_force_flush_called](sdk::common::ExportResult result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we declare async Export
of LogExporter
and SpanExporter
as void Export(const nostd::span<std::unique_ptr<Recordable>> &records, std::function<bool(ExportResult)>&& result_callback) noexcept override
?
These codes capture some data and this lambda object can not be destroyed before result_callback
be called. We need store this result_callback
some where and call it later, but with function_ref
, we can not move it.
As metioned in http://open-std.org/JTC1/SC22/WG21/docs/papers/2019/p0792r5.html ,
function_ref
is designed as a parameter, but not for storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes good point. we can't transfer ownership of function_ref
.
std::function<bool(ExportResult)>&& result_callback
would be right callback to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DebajitDas Could you please raise another PR to modify the type of result_callback
and push to async-changes
? I will rebase from it merge the async version of OtlpHttpClient::Export
then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@owent Do you want me to have the storage logic also in this PR? I can do only the type change by EOD.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Raised this PR #1278 which has the changes for type change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing this PR as the same has been merged to feature branch async-changes |
…porters
Fixes # (#1239)
Changes
Includes changes related to interface between processor to exporter such that async callback can be handled in the processor.
For significant contributions please make sure you have completed the following items:
CHANGELOG.md
updated for non-trivial changes