Skip to content

Commit

Permalink
make the rules sample limiter and default sampling rate configurable (#…
Browse files Browse the repository at this point in the history
…215)

* add limiter test for less-than-one-per-second

* add limiter constructor that takes just a rate

* configure rules sampler limiter with DD_TRACE_RATE_LIMIT

* configure rules sampler limiter with factory (JSON), and include in config banner

* test that DD_TRACE_RATE_LIMIT affects TracerOptions::sampling_limit_per_second

* remove priority sampling from nginx integration tests

* repurpose TracerOptions::sample_rate to be the default fallback rule sampling rate, and allow configuration using DD_TRACE_SAMPLE_RATE

* review: change error reporting in parseDouble

* review: ready for my close-up, Mr. DeMille

* review: test parsing failure of new env variables

* NaN nan NaN nan

* fix: fallback rule exists only if sample_rate is specified; otherwise, fall back to priority sampling

* Revert "remove priority sampling from nginx integration tests"

This reverts commit 80d11a7.

* fix: wrote the new catch-all rule test incorrectly

* remove redundant code, improve readability of integration test output

* fix the nginx integration tests per sampling changes

* remove debugging artifacts

* remove reference to defunct initializer in config test

* remove debugging artifact
  • Loading branch information
dgoffredo authored Mar 10, 2022
1 parent 4fba056 commit 2b017b7
Show file tree
Hide file tree
Showing 19 changed files with 357 additions and 169 deletions.
38 changes: 26 additions & 12 deletions include/datadog/opentracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,30 @@ struct TracerOptions {
// The environment this trace belongs to. eg. "" (env:none), "staging", "prod". Can also be set
// by the environment variable DD_ENV
std::string environment = "";
// This option is deprecated and may be removed in future releases.
// It is equivalent to setting a sampling rule with only a "sample_rate".
// Values must be between 0.0 and 1.0 (inclusive)
// `sample_rate` is the default sampling rate for any trace unmatched by a
// sampling rule. Setting `sample_rate` is equivalent to appending to
// `sampling_rules` a rule whose "sample_rate" is `sample_rate`. If
// `sample_rate` is NaN, then no default rule is added, and traces not
// matching any sampling rule are subject to "priority sampling," where the
// sampling rate is determined by the Datadog trace agent. This option is
// also configurable as the environment variable DD_TRACE_SAMPLE_RATE.
double sample_rate = std::nan("");
// This option is deprecated, and may be removed in future releases.
bool priority_sampling = true;
// Rules sampling is applied when initiating traces to determine the sampling rate.
// Traces that do not match any rules fall back to using priority sampling, where the rate is
// determined by a combination of user-assigned priorities and configuration from the agent.
// Configuration is specified as a JSON array of objects. Each object must have a "sample_rate",
// and the "name" and "service" fields are optional. The "sample_rate" value must be between 0.0
// and 1.0 (inclusive). Rules are applied in configured order, so a specific match should be
// specified before a wider match. If any rules are invalid, they are ignored. Can also be set by
// the environment variable DD_TRACE_SAMPLING_RULES.
std::string sampling_rules = R"([{"sample_rate": 1.0}])";
// Rules sampling is applied when initiating traces to determine the sampling
// rate. Configuration is specified as a JSON array of objects. Each object
// must have a "sample_rate", while the "name" and "service" fields are
// optional. The "sample_rate" value must be between 0.0 and 1.0 (inclusive).
// Rules are checked in order, so a more specific rule should be specified
// before a less specific rule. Note that if the `sample_rate` field of this
// `TracerOptions` has a non-NaN value, then there is an implicit rule at the
// end of the list that matches any trace unmatched by other rules, and
// applies a sampling rate of `sample_rate`. If no rule matches a trace,
// then "priority sampling" is applied instead, where the sample rate is
// determined by the Datadog trace agent. If any rules are invalid, they are
// ignored. This option is also configurable as the environment variable
// DD_TRACE_SAMPLING_RULES.
std::string sampling_rules = "[]";
// Max amount of time to wait between sending traces to agent, in ms. Agent discards traces older
// than 10s, so that is the upper bound.
int64_t write_period_ms = 1000;
Expand Down Expand Up @@ -134,6 +143,11 @@ struct TracerOptions {
}
std::cerr << level_str + ": " + message.data() + "\n";
};
// `sampling_limit_per_second` is the limit on the number of rule-controlled
// traces that may be sampled per second. This includes traces that match
// the implicit "catch-all" rule appended to `sampling_rules`. This option
// is also configurable as the environment variable DD_TRACE_RATE_LIMIT.
double sampling_limit_per_second = 100;
};

// TraceEncoder exposes the data required to encode and submit traces to the
Expand Down
4 changes: 4 additions & 0 deletions src/limiter.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "limiter.h"

#include <algorithm>
#include <cmath>
#include <iostream>
#include <numeric>

Expand All @@ -27,6 +28,9 @@ Limiter::Limiter(TimeProvider now_func, long max_tokens, double refresh_rate,
previous_rates_sum_ = std::accumulate(previous_rates_.begin(), previous_rates_.end(), 0.0);
}

Limiter::Limiter(TimeProvider now_func, double allowed_per_second)
: Limiter(now_func, long(std::ceil(allowed_per_second)), allowed_per_second, 1) {}

LimitResult Limiter::allow() { return allow(1); }

LimitResult Limiter::allow(long tokens_requested) {
Expand Down
1 change: 1 addition & 0 deletions src/limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct LimitResult {
class Limiter {
public:
Limiter(TimeProvider clock, long max_tokens, double refresh_rate, long tokens_per_refresh);
Limiter(TimeProvider clock, double allowed_per_second);

LimitResult allow();
LimitResult allow(long tokens);
Expand Down
2 changes: 1 addition & 1 deletion src/opentracing_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ std::shared_ptr<ot::Tracer> makeTracer(const TracerOptions &options) {
TracerOptions opts = maybe_options.value();

auto logger = makeLogger(opts);
auto sampler = std::make_shared<RulesSampler>();
auto sampler = std::make_shared<RulesSampler>(opts.sampling_limit_per_second);
auto writer = std::shared_ptr<Writer>{
new AgentWriter(opts.agent_host, opts.agent_port, opts.agent_url,
std::chrono::milliseconds(llabs(opts.write_period_ms)), sampler, logger)};
Expand Down
2 changes: 1 addition & 1 deletion src/opentracing_external.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ std::tuple<std::shared_ptr<ot::Tracer>, std::shared_ptr<TraceEncoder>> makeTrace
}
TracerOptions opts = maybe_options.value();

auto sampler = std::make_shared<RulesSampler>();
auto sampler = std::make_shared<RulesSampler>(opts.sampling_limit_per_second);
auto writer = std::make_shared<ExternalWriter>(sampler, logger);
auto encoder = writer->encoder();
return std::tuple<std::shared_ptr<ot::Tracer>, std::shared_ptr<TraceEncoder>>{
Expand Down
3 changes: 3 additions & 0 deletions src/sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ void PrioritySampler::configure(json config) {

RulesSampler::RulesSampler() : sampling_limiter_(getRealTime, 100, 100.0, 1) {}

RulesSampler::RulesSampler(double limit_per_second)
: sampling_limiter_(getRealTime, limit_per_second) {}

RulesSampler::RulesSampler(TimeProvider clock, long max_tokens, double refresh_rate,
long tokens_per_refresh)
: sampling_limiter_(clock, max_tokens, refresh_rate, tokens_per_refresh) {}
Expand Down
1 change: 1 addition & 0 deletions src/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ using RuleFunc = std::function<RuleResult(const std::string&, const std::string&
class RulesSampler {
public:
RulesSampler();
explicit RulesSampler(double limit_per_second);
RulesSampler(TimeProvider clock, long max_tokens, double refresh_rate, long tokens_per_refresh);
// Some of the member functions of this class are declared `virtual` so that
// they can be overridden by `MockRulesSampler` for use in unit tests.
Expand Down
141 changes: 79 additions & 62 deletions src/tracer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <unistd.h>
#endif

#include <cmath>
#include <fstream>
#include <random>
#include <sstream>
Expand Down Expand Up @@ -123,7 +124,11 @@ void startupLog(TracerOptions &options) {
}
j["analytics_enabled"] = options.analytics_enabled;
j["analytics_sample_rate"] = options.analytics_rate;
if (!std::isnan(options.sample_rate)) {
j["sample_rate"] = options.sample_rate;
}
j["sampling_rules"] = options.sampling_rules;
j["sampling_limit_per_second"] = options.sampling_limit_per_second;
if (!options.tags.empty()) {
j["tags"] = options.tags;
}
Expand Down Expand Up @@ -167,71 +172,83 @@ uint64_t traceTagsPropagationMaxLength(const TracerOptions & /*options*/, const

} // namespace

void Tracer::configureRulesSampler(std::shared_ptr<RulesSampler> sampler) noexcept try {
auto log_invalid_json = [&](const std::string &description, json &object) {
logger_->Log(LogLevel::info, description + ": " + object.get<std::string>());
};
json config = json::parse(opts_.sampling_rules);
for (auto &item : config.items()) {
auto rule = item.value();
if (!rule.is_object()) {
log_invalid_json("rules sampler: unexpected item in sampling rules", rule);
continue;
}
// "sample_rate" is mandatory
if (!rule.contains("sample_rate")) {
log_invalid_json("rules sampler: rule is missing 'sample_rate'", rule);
continue;
}
if (!rule.at("sample_rate").is_number()) {
log_invalid_json("rules sampler: invalid type for 'sample_rate' (expected number)", rule);
continue;
}
auto sample_rate = rule.at("sample_rate").get<json::number_float_t>();
if (!(sample_rate >= 0.0 && sample_rate <= 1.0)) {
log_invalid_json(
"rules sampler: invalid value for sample rate (expected value between 0.0 and 1.0)",
rule);
}
// "service" and "name" are optional
bool has_service = rule.contains("service") && rule.at("service").is_string();
bool has_name = rule.contains("name") && rule.at("name").is_string();
auto nan = std::nan("");
if (has_service && has_name) {
auto svc = rule.at("service").get<std::string>();
auto nm = rule.at("name").get<std::string>();
sampler->addRule([=](const std::string &service, const std::string &name) -> RuleResult {
if (service == svc && name == nm) {
return {true, sample_rate};
}
return {false, nan};
});
} else if (has_service) {
auto svc = rule.at("service").get<std::string>();
sampler->addRule([=](const std::string &service, const std::string &) -> RuleResult {
if (service == svc) {
return {true, sample_rate};
}
return {false, nan};
});
} else if (has_name) {
auto nm = rule.at("name").get<std::string>();
sampler->addRule([=](const std::string &, const std::string &name) -> RuleResult {
if (name == nm) {
void Tracer::configureRulesSampler(std::shared_ptr<RulesSampler> sampler) noexcept {
try {
auto log_invalid_json = [&](const std::string &description, json &object) {
logger_->Log(LogLevel::info, description + ": " + object.get<std::string>());
};
json config = json::parse(opts_.sampling_rules);
for (auto &item : config.items()) {
auto rule = item.value();
if (!rule.is_object()) {
log_invalid_json("rules sampler: unexpected item in sampling rules", rule);
continue;
}
// "sample_rate" is mandatory
if (!rule.contains("sample_rate")) {
log_invalid_json("rules sampler: rule is missing 'sample_rate'", rule);
continue;
}
if (!rule.at("sample_rate").is_number()) {
log_invalid_json("rules sampler: invalid type for 'sample_rate' (expected number)", rule);
continue;
}
auto sample_rate = rule.at("sample_rate").get<json::number_float_t>();
if (!(sample_rate >= 0.0 && sample_rate <= 1.0)) {
log_invalid_json(
"rules sampler: invalid value for sample rate (expected value between 0.0 and 1.0)",
rule);
}
// "service" and "name" are optional
bool has_service = rule.contains("service") && rule.at("service").is_string();
bool has_name = rule.contains("name") && rule.at("name").is_string();
auto nan = std::nan("");
if (has_service && has_name) {
auto svc = rule.at("service").get<std::string>();
auto nm = rule.at("name").get<std::string>();
sampler->addRule([=](const std::string &service, const std::string &name) -> RuleResult {
if (service == svc && name == nm) {
return {true, sample_rate};
}
return {false, nan};
});
} else if (has_service) {
auto svc = rule.at("service").get<std::string>();
sampler->addRule([=](const std::string &service, const std::string &) -> RuleResult {
if (service == svc) {
return {true, sample_rate};
}
return {false, nan};
});
} else if (has_name) {
auto nm = rule.at("name").get<std::string>();
sampler->addRule([=](const std::string &, const std::string &name) -> RuleResult {
if (name == nm) {
return {true, sample_rate};
}
return {false, nan};
});
} else {
sampler->addRule([=](const std::string &, const std::string &) -> RuleResult {
return {true, sample_rate};
}
return {false, nan};
});
} else {
sampler->addRule([=](const std::string &, const std::string &) -> RuleResult {
return {true, sample_rate};
});
});
}
}
} catch (const json::parse_error &error) {
std::ostringstream message;
message << "rules sampler: unable to parse JSON config for rules sampler: " << error.what();
logger_->Log(LogLevel::error, message.str());
}

// If there is a configured overall sample rate, add an automatic "catch all"
// rule to the end that samples at that rate. Otherwise, don't (unmatched
// traces will be subject to priority sampling).
const double sample_rate = opts_.sample_rate;
if (!std::isnan(sample_rate)) {
sampler->addRule([=](const std::string &, const std::string &) -> RuleResult {
return {true, sample_rate};
});
}
} catch (const json::parse_error &error) {
std::ostringstream message;
message << "rules sampler: unable to parse JSON config for rules sampler: " << error.what();
logger_->Log(LogLevel::error, message.str());
}

Tracer::Tracer(TracerOptions options, std::shared_ptr<SpanBuffer> buffer, TimeProvider get_time,
Expand Down
5 changes: 4 additions & 1 deletion src/tracer_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace opentracing {

ot::expected<TracerOptions> optionsFromConfig(const char *configuration,
std::string &error_message) {
TracerOptions options{"localhost", 8126, "", "web", "", 1.0};
TracerOptions options;
json config;
try {
config = json::parse(configuration);
Expand Down Expand Up @@ -85,6 +85,9 @@ ot::expected<TracerOptions> optionsFromConfig(const char *configuration,
if (config.find("dd.trace.analytics-sample-rate") != config.end()) {
config.at("dd.trace.analytics-sample-rate").get_to(options.analytics_rate);
}
if (config.find("sampling_limit_per_second") != config.end()) {
config.at("sampling_limit_per_second").get_to(options.sampling_limit_per_second);
}
} catch (const nlohmann::detail::type_error &) {
error_message = "configuration has an argument with an incorrect type";
return ot::make_unexpected(std::make_error_code(std::errc::invalid_argument));
Expand Down
25 changes: 3 additions & 22 deletions src/tracer_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,9 @@ ot::expected<TracerOptions> optionsFromConfig(const char *configuration,
template <class TracerImpl>
class TracerFactory : public ot::TracerFactory {
public:
// Accepts configuration in JSON format, with the following keys:
// "service": Required. A string, the name of the service.
// "agent_host": A string, defaults to localhost. Can also be set by the environment variable
// DD_AGENT_HOST
// "agent_port": A number, defaults to 8126. "type": A string, defaults to web. Can also be set
// by the environment variable DD_TRACE_AGENT_PORT
// "type": A string, defaults to web.
// "environment": A string, defaults to "". The environment this trace belongs to.
// eg. "" (env:none), "staging", "prod". Can also be set by the environment variable
// DD_ENV
// "sample_rate": A double, defaults to 1.0.
// "operation_name_override": A string, if not empty it overrides the operation name (and the
// overridden operation name is recorded in the tag "operation").
// "propagation_style_extract": A list of strings, each string is one of "Datadog", "B3".
// Defaults to ["Datadog"]. The type of headers to use to propagate
// distributed traces. Can also be set by the environment variable
// DD_PROPAGATION_STYLE_EXTRACT.
// "propagation_style_inject": A list of strings, each string is one of "Datadog", "B3". Defaults
// to ["Datadog"]. The type of headers to use to receive distributed traces. Can also be set
// by the environment variable DD_PROPAGATION_STYLE_INJECT.
//
// Extra keys will be ignored.
// Accepts configuration as a JSON object. See `optionsFromConfig` in
// tracer_factory.cpp for a list of supported attributes. Unsupported
// attributes are ignored.
ot::expected<std::shared_ptr<ot::Tracer>> MakeTracer(const char *configuration,
std::string &error_message) const
noexcept override;
Expand Down
Loading

0 comments on commit 2b017b7

Please sign in to comment.