Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1663 Threading issue between Meter::RegisterSyncMetricStorage and Meter::Collect #1666

Merged
merged 11 commits into from
Oct 11, 2022
1 change: 1 addition & 0 deletions sdk/include/opentelemetry/sdk/metrics/meter.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class Meter final : public opentelemetry::metrics::Meter
InstrumentDescriptor &instrument_descriptor);
std::unique_ptr<AsyncWritableMetricStorage> RegisterAsyncMetricStorage(
InstrumentDescriptor &instrument_descriptor);
opentelemetry::common::SpinLockMutex storage_lock_;
};
} // namespace metrics
} // namespace sdk
Expand Down
3 changes: 3 additions & 0 deletions sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ const sdk::instrumentationscope::InstrumentationScope *Meter::GetInstrumentation
std::unique_ptr<SyncWritableMetricStorage> Meter::RegisterSyncMetricStorage(
InstrumentDescriptor &instrument_descriptor)
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
auto ctx = meter_context_.lock();
if (!ctx)
{
Expand Down Expand Up @@ -251,6 +252,7 @@ std::unique_ptr<SyncWritableMetricStorage> Meter::RegisterSyncMetricStorage(
std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
InstrumentDescriptor &instrument_descriptor)
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
auto ctx = meter_context_.lock();
if (!ctx)
{
Expand Down Expand Up @@ -302,6 +304,7 @@ std::vector<MetricData> Meter::Collect(CollectorHandle *collector,
<< "The metric context is invalid");
return std::vector<MetricData>{};
}
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
for (auto &metric_storage : storage_registry_)
{
metric_storage.second->Collect(collector, ctx->GetCollectors(), ctx->GetSDKStartTime(),
Expand Down
64 changes: 62 additions & 2 deletions sdk/test/metrics/meter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ class MockMetricReader : public MetricReader

namespace
{
nostd::shared_ptr<metrics::Meter> InitMeter(MetricReader **metricReaderPtr)
nostd::shared_ptr<metrics::Meter> InitMeter(MetricReader **metricReaderPtr,
std::string meter_name = "meter_name")
{
static std::shared_ptr<metrics::MeterProvider> provider(new MeterProvider());
std::unique_ptr<MetricReader> metric_reader(new MockMetricReader());
*metricReaderPtr = metric_reader.get();
auto p = std::static_pointer_cast<MeterProvider>(provider);
p->AddMetricReader(std::move(metric_reader));
auto meter = provider->GetMeter("meter_name");
auto meter = provider->GetMeter(meter_name);
return meter;
}
} // namespace
Expand Down Expand Up @@ -70,6 +71,65 @@ TEST(MeterTest, BasicAsyncTests)
}
return true;
});
observable_counter->RemoveCallback(asyc_generate_measurements, nullptr);
}

constexpr static unsigned MAX_THREADS = 25;
constexpr static unsigned MAX_ITERATIONS_MT = 1000;

TEST(MeterTest, StressMultiThread)
{
MetricReader *metric_reader_ptr = nullptr;
auto meter = InitMeter(&metric_reader_ptr, "stress_test_meter");
std::atomic<unsigned> threadCount(0);
size_t numIterations = MAX_ITERATIONS_MT;
std::atomic<bool> do_collect{false}, do_sync_create{true}, do_async_create{false};
std::vector<nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument>>
observable_instruments;
std::vector<std::thread> meter_operation_threads;
size_t instrument_id = 0;
while (numIterations--)
{
for (size_t i = 0; i < MAX_THREADS; i++)
{
if (threadCount++ < MAX_THREADS)
{
auto t = std::thread([&]() {
std::this_thread::yield();
esigo marked this conversation as resolved.
Show resolved Hide resolved
if (do_sync_create.exchange(false))
{
std::string instrument_name = "test_couter_" + std::to_string(instrument_id);
meter->CreateLongCounter(instrument_name, "", "");
do_async_create.store(true);
instrument_id++;
}
if (do_async_create.exchange(false))
{
std::cout << "\n creating async thread " << std::to_string(numIterations);
auto observable_instrument =
meter->CreateLongObservableGauge("test_gauge_" + std::to_string(instrument_id));
observable_instrument->AddCallback(asyc_generate_measurements, nullptr);
observable_instruments.push_back(std::move(observable_instrument));
do_collect.store(true);
instrument_id++;
}
if (do_collect.exchange(false))
{
metric_reader_ptr->Collect([](ResourceMetrics &metric_data) { return true; });
do_sync_create.store(true);
}
});
meter_operation_threads.push_back(std::move(t));
}
}
}
for (auto &t : meter_operation_threads)
{
if (t.joinable())
{
t.join();
}
}
}

#endif