diff --git a/src/v/transform/probe.h b/src/v/transform/probe.h index ec725d0ac5a2..12c78f446a18 100644 --- a/src/v/transform/probe.h +++ b/src/v/transform/probe.h @@ -11,7 +11,7 @@ #pragma once #include "model/transform.h" -#include "wasm/probe.h" +#include "wasm/transform_probe.h" #include diff --git a/src/v/transform/tests/test_fixture.cc b/src/v/transform/tests/test_fixture.cc index 6ea13cf020e6..c1eb2f0985ca 100644 --- a/src/v/transform/tests/test_fixture.cc +++ b/src/v/transform/tests/test_fixture.cc @@ -64,11 +64,6 @@ ss::future<> fake_source::push_batch(model::record_batch batch) { _cond_var.broadcast(); co_return; } - -uint64_t fake_wasm_engine::memory_usage_size_bytes() const { - // NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers) - return 64_KiB; -}; ss::future<> fake_wasm_engine::start() { return ss::now(); } ss::future<> fake_wasm_engine::stop() { return ss::now(); } diff --git a/src/v/transform/tests/test_fixture.h b/src/v/transform/tests/test_fixture.h index ae0b11911970..6f8e798217e4 100644 --- a/src/v/transform/tests/test_fixture.h +++ b/src/v/transform/tests/test_fixture.h @@ -18,7 +18,7 @@ #include "units.h" #include "utils/notification_list.h" #include "wasm/api.h" -#include "wasm/probe.h" +#include "wasm/transform_probe.h" #include #include @@ -46,8 +46,6 @@ class fake_wasm_engine : public wasm::engine { ss::future<> start() override; ss::future<> stop() override; - - uint64_t memory_usage_size_bytes() const override; }; class fake_source : public source { diff --git a/src/v/wasm/CMakeLists.txt b/src/v/wasm/CMakeLists.txt index 5fdb9ba5b062..e9be2254ab19 100644 --- a/src/v/wasm/CMakeLists.txt +++ b/src/v/wasm/CMakeLists.txt @@ -8,7 +8,7 @@ v_cc_library( api.cc ffi.cc logger.cc - probe.cc + transform_probe.cc schema_registry.cc schema_registry_module.cc transform_module.cc @@ -16,6 +16,7 @@ v_cc_library( wasmtime.cc cache.cc allocator.cc + engine_probe.cc DEPS wasmtime v::storage diff --git a/src/v/wasm/api.h b/src/v/wasm/api.h index fd78507fee53..f97e472966ff 100644 --- a/src/v/wasm/api.h +++ b/src/v/wasm/api.h @@ -35,8 +35,6 @@ class engine { virtual ss::future<> start() = 0; virtual ss::future<> stop() = 0; - virtual uint64_t memory_usage_size_bytes() const = 0; - engine() = default; virtual ~engine() = default; engine(const engine&) = delete; diff --git a/src/v/wasm/cache.cc b/src/v/wasm/cache.cc index c755d1144dbd..7072c13b6ff1 100644 --- a/src/v/wasm/cache.cc +++ b/src/v/wasm/cache.cc @@ -110,10 +110,6 @@ class shared_engine } } - uint64_t memory_usage_size_bytes() const override { - return _underlying->memory_usage_size_bytes(); - } - private: mutex _mu; size_t _ref_count = 0; diff --git a/src/v/wasm/engine_probe.cc b/src/v/wasm/engine_probe.cc new file mode 100644 index 000000000000..4bc8ddbf5c45 --- /dev/null +++ b/src/v/wasm/engine_probe.cc @@ -0,0 +1,101 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "wasm/engine_probe.h" + +#include "metrics/metrics.h" +#include "prometheus/prometheus_sanitize.h" + +#include +#include + +namespace wasm { + +namespace internal { +engine_probe_impl::engine_probe_impl( + engine_probe_cache* cache, ss::sstring name) + : _name(std::move(name)) + , _cache(cache) { + namespace sm = ss::metrics; + + auto name_label = sm::label("function_name"); + const std::vector labels = { + name_label(_name), + }; + _public_metrics.add_group( + prometheus_sanitize::metrics_name("wasm_engine"), + { + sm::make_gauge( + "memory_usage", + sm::description("Amount of memory usage for a WebAssembly function"), + labels, + [this] { return _memory_usage; }) + .aggregate({ss::metrics::shard_label}), + sm::make_gauge( + "max_memory", + [this] { return _max_memory; }, + sm::description("Max amount of memory for a WebAssembly function"), + labels) + .aggregate({ss::metrics::shard_label}), + }); +} +engine_probe_impl::~engine_probe_impl() { + // Remove from cache when deleted. + _cache->remove_probe(_name); +} +void engine_probe_impl::report_memory_usage_delta(int64_t delta) { + _memory_usage += delta; +} + +void engine_probe_impl::report_max_memory_delta(int64_t delta) { + _max_memory += delta; +} +} // namespace internal + +engine_probe::engine_probe(ss::lw_shared_ptr impl) + : _impl(std::move(impl)) {} + +engine_probe::~engine_probe() { + if (!_impl) { + return; + } + _impl->report_max_memory_delta(-int64_t(_last_reported_max_memory)); + _impl->report_memory_usage_delta(-int64_t(_last_reported_memory_usage)); +} + +void engine_probe::report_memory_usage(uint32_t usage) { + int64_t delta = int64_t(usage) - int64_t(_last_reported_memory_usage); + _impl->report_memory_usage_delta(delta); + _last_reported_memory_usage = usage; +} + +void engine_probe::report_max_memory(uint32_t max) { + int64_t delta = int64_t(max) - int64_t(_last_reported_max_memory); + _impl->report_max_memory_delta(delta); + _last_reported_max_memory = max; +} + +engine_probe engine_probe_cache::make_probe(const ss::sstring& name) { + internal::engine_probe_impl*& probe = _probe_cache[name]; + if (probe) { + return engine_probe(probe->shared_from_this()); + } + auto probe_impl = ss::make_lw_shared( + this, name); + probe = probe_impl.get(); + return engine_probe(probe_impl); +} + +void engine_probe_cache::remove_probe(const ss::sstring& name) { + vassert( + _probe_cache.erase(name) > 0, "wasm engine probe cache inconsistency"); +} +} // namespace wasm diff --git a/src/v/wasm/engine_probe.h b/src/v/wasm/engine_probe.h new file mode 100644 index 000000000000..6caf04d95a1d --- /dev/null +++ b/src/v/wasm/engine_probe.h @@ -0,0 +1,97 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "metrics/metrics.h" +#include "seastarx.h" + +#include +#include +#include + +#include + +namespace wasm { + +class engine_probe_cache; + +namespace internal { +class engine_probe_impl + : public ss::enable_lw_shared_from_this { +public: + engine_probe_impl(engine_probe_cache* cache, ss::sstring name); + engine_probe_impl(const engine_probe_impl&) = delete; + engine_probe_impl(engine_probe_impl&&) = delete; + engine_probe_impl& operator=(const engine_probe_impl&) = delete; + engine_probe_impl& operator=(engine_probe_impl&&) = delete; + ~engine_probe_impl(); + void report_memory_usage_delta(int64_t); + void report_max_memory_delta(int64_t); + +private: + ss::sstring _name; + engine_probe_cache* _cache; + int64_t _memory_usage = 0; + int64_t _max_memory = 0; + metrics::public_metric_groups _public_metrics; +}; +} // namespace internal + +/** + * A probe for all types of wasm engines. + * + * Used to track things like memory and CPU usage. + */ +class engine_probe { +public: + explicit engine_probe(ss::lw_shared_ptr impl); + engine_probe(const engine_probe&) = delete; + engine_probe& operator=(const engine_probe&) = delete; + engine_probe(engine_probe&&) = default; + engine_probe& operator=(engine_probe&&) = default; + ~engine_probe(); + + void report_memory_usage(uint32_t); + void report_max_memory(uint32_t); + +private: + ss::lw_shared_ptr _impl; + uint32_t _last_reported_memory_usage = 0; + uint32_t _last_reported_max_memory = 0; +}; + +/** + * A cache for managing probes for all engines on a core. + * + * Since during deployments it's possible to have multiple versions of an engine + * with the same name, we need to ensure there are not conflicting probes by + * caching and reusing them. + * + * This cache must outlive all probes it creates and should have application + * scoped lifetimes. + */ +class engine_probe_cache { +public: + /** + * Create an engine probe under a given name. + */ + engine_probe make_probe(const ss::sstring&); + +private: + friend internal::engine_probe_impl; + + void remove_probe(const ss::sstring&); + + absl::btree_map _probe_cache; +}; + +} // namespace wasm diff --git a/src/v/wasm/tests/CMakeLists.txt b/src/v/wasm/tests/CMakeLists.txt index 3a297db8127d..8f1486a4a1fd 100644 --- a/src/v/wasm/tests/CMakeLists.txt +++ b/src/v/wasm/tests/CMakeLists.txt @@ -86,6 +86,19 @@ rp_test( LABELS wasm ) +rp_test( + UNIT_TEST + GTEST + BINARY_NAME wasm_probe + SOURCES + wasm_probe_test.cc + LIBRARIES + v::gtest_main + v::wasm + ARGS "-- -c 1" + LABELS wasm +) + rp_test( UNIT_TEST GTEST diff --git a/src/v/wasm/tests/wasm_cache_test.cc b/src/v/wasm/tests/wasm_cache_test.cc index 8804acbe0ddb..5760f73cac69 100644 --- a/src/v/wasm/tests/wasm_cache_test.cc +++ b/src/v/wasm/tests/wasm_cache_test.cc @@ -81,8 +81,6 @@ class fake_engine : public engine { co_return batch; } - uint64_t memory_usage_size_bytes() const override { return 0; } - private: bool _has_been_stopped = false; state* _state; diff --git a/src/v/wasm/tests/wasm_fixture.cc b/src/v/wasm/tests/wasm_fixture.cc index 0ea0271b19f4..176f58385952 100644 --- a/src/v/wasm/tests/wasm_fixture.cc +++ b/src/v/wasm/tests/wasm_fixture.cc @@ -22,7 +22,6 @@ #include "storage/parser_utils.h" #include "storage/record_batch_builder.h" #include "wasm/api.h" -#include "wasm/probe.h" #include "wasm/wasmtime.h" #include diff --git a/src/v/wasm/tests/wasm_fixture.h b/src/v/wasm/tests/wasm_fixture.h index 7e3a39bf0214..85e9e32e8096 100644 --- a/src/v/wasm/tests/wasm_fixture.h +++ b/src/v/wasm/tests/wasm_fixture.h @@ -18,8 +18,8 @@ #include "serde/rw/scalar.h" #include "test_utils/test.h" #include "wasm/api.h" -#include "wasm/probe.h" #include "wasm/schema_registry.h" +#include "wasm/transform_probe.h" #include diff --git a/src/v/wasm/tests/wasm_probe_test.cc b/src/v/wasm/tests/wasm_probe_test.cc new file mode 100644 index 000000000000..b8c6bb9fe6de --- /dev/null +++ b/src/v/wasm/tests/wasm_probe_test.cc @@ -0,0 +1,108 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "gmock/gmock.h" +#include "metrics/metrics.h" +#include "units.h" +#include "wasm/logger.h" + +#include +#include +#include + +#include +#include +#include +#include + +#include + +namespace wasm { + +namespace { + +std::optional find_guage_value( + std::string_view metric_name, const ss::sstring& function_name) { + auto metrics = ss::metrics::impl::get_value_map( + metrics::public_metrics_handle); + auto metrics_it = metrics.find(ss::sstring(metric_name)); + if (metrics_it == metrics.end()) { + return std::nullopt; + } + seastar::metrics::impl::metric_family family = metrics_it->second; + auto family_it = family.find( + {{"function_name", function_name}, + {ss::metrics::shard_label.name(), std::to_string(ss::this_shard_id())}}); + if (family_it == family.end()) { + return std::nullopt; + } + ss::metrics::impl::metric_function metric_fn + = family_it->second->get_function(); + seastar::metrics::impl::metric_value sample = metric_fn(); + return sample.ui(); +} + +std::optional +reported_memory_usage(const ss::sstring& function_name) { + return find_guage_value("wasm_engine_memory_usage", function_name); +} + +std::optional reported_max_memory(const ss::sstring& function_name) { + return find_guage_value("wasm_engine_max_memory", function_name); +} + +} // namespace + +using ::testing::Optional; + +// NOLINTBEGIN(cppcoreguidelines-avoid-magic-numbers) + +TEST(EngineProbeTest, ReportsDeltas) { + engine_probe_cache cache; + std::optional foobar_probe = cache.make_probe("foobar"); + EXPECT_THAT(reported_max_memory("foobar"), Optional(0)); + EXPECT_THAT(reported_memory_usage("foobar"), Optional(0)); + foobar_probe->report_memory_usage(1_KiB); + foobar_probe->report_max_memory(10_KiB); + EXPECT_THAT(reported_memory_usage("foobar"), Optional(1_KiB)); + EXPECT_THAT(reported_max_memory("foobar"), Optional(10_KiB)); + foobar_probe->report_memory_usage(2_KiB); + foobar_probe->report_max_memory(5_KiB); + EXPECT_THAT(reported_memory_usage("foobar"), Optional(2_KiB)); + EXPECT_THAT(reported_max_memory("foobar"), Optional(5_KiB)); + foobar_probe = std::nullopt; + EXPECT_THAT(reported_max_memory("foobar"), std::nullopt); + EXPECT_THAT(reported_memory_usage("foobar"), std::nullopt); +} + +TEST(EngineProbeTest, SumsAcrossEngines) { + engine_probe_cache cache; + std::optional p1 = cache.make_probe("foobar"); + std::optional p2 = cache.make_probe("foobar"); + std::optional p3 = cache.make_probe("foobar"); + EXPECT_THAT(reported_memory_usage("foobar"), Optional(0)); + p1->report_memory_usage(5_KiB); + p2->report_memory_usage(2_KiB); + p3->report_memory_usage(10_KiB); + EXPECT_THAT(reported_memory_usage("foobar"), 17_KiB); + p3->report_memory_usage(9_KiB); + EXPECT_THAT(reported_memory_usage("foobar"), 16_KiB); + p1 = std::nullopt; + EXPECT_THAT(reported_memory_usage("foobar"), 11_KiB); + p2 = std::nullopt; + EXPECT_THAT(reported_memory_usage("foobar"), 9_KiB); + p3 = std::nullopt; + EXPECT_THAT(reported_memory_usage("foobar"), std::nullopt); +} + +// NOLINTEND(cppcoreguidelines-avoid-magic-numbers) + +} // namespace wasm diff --git a/src/v/wasm/tests/wasm_transform_bench.cc b/src/v/wasm/tests/wasm_transform_bench.cc index cef002311b44..43b315f6030f 100644 --- a/src/v/wasm/tests/wasm_transform_bench.cc +++ b/src/v/wasm/tests/wasm_transform_bench.cc @@ -16,8 +16,8 @@ #include "units.h" #include "wasm/api.h" #include "wasm/logger.h" -#include "wasm/probe.h" #include "wasm/schema_registry.h" +#include "wasm/transform_probe.h" #include "wasm/wasmtime.h" #include diff --git a/src/v/wasm/tests/wasm_transform_test.cc b/src/v/wasm/tests/wasm_transform_test.cc index e3829035c9b5..2f09a81ec53f 100644 --- a/src/v/wasm/tests/wasm_transform_test.cc +++ b/src/v/wasm/tests/wasm_transform_test.cc @@ -59,11 +59,6 @@ TEST_F(WasmTestFixture, HandlesTransformErrors) { EXPECT_THROW(transform(make_tiny_batch()), wasm::wasm_exception); } -TEST_F(WasmTestFixture, CanComputeMemoryUsage) { - load_wasm("identity.wasm"); - ASSERT_GT(engine()->memory_usage_size_bytes(), 0); -} - namespace { std::string generate_example_avro_record( const pandaproxy::schema_registry::canonical_schema_definition& def) { diff --git a/src/v/wasm/probe.cc b/src/v/wasm/transform_probe.cc similarity index 97% rename from src/v/wasm/probe.cc rename to src/v/wasm/transform_probe.cc index 895489f60e74..b03207ce73d0 100644 --- a/src/v/wasm/probe.cc +++ b/src/v/wasm/transform_probe.cc @@ -9,7 +9,7 @@ * by the Apache License, Version 2.0 */ -#include "probe.h" +#include "wasm/transform_probe.h" #include "config/configuration.h" #include "prometheus/prometheus_sanitize.h" diff --git a/src/v/wasm/probe.h b/src/v/wasm/transform_probe.h similarity index 91% rename from src/v/wasm/probe.h rename to src/v/wasm/transform_probe.h index 2374575e1002..ef59f54a7e4e 100644 --- a/src/v/wasm/probe.h +++ b/src/v/wasm/transform_probe.h @@ -24,7 +24,11 @@ namespace wasm { -// Per transform probe +/** + * A probe for wasm engines that implement record transforms. + * + * This probe allows for reporting errors and latency for transforms. + */ class transform_probe { public: using hist_t = log_hist_public; diff --git a/src/v/wasm/wasmtime.cc b/src/v/wasm/wasmtime.cc index f670bc2a758e..02e9d2e5968f 100644 --- a/src/v/wasm/wasmtime.cc +++ b/src/v/wasm/wasmtime.cc @@ -22,12 +22,13 @@ #include "vassert.h" #include "wasm/allocator.h" #include "wasm/api.h" +#include "wasm/engine_probe.h" #include "wasm/errc.h" #include "wasm/ffi.h" #include "wasm/logger.h" -#include "wasm/probe.h" #include "wasm/schema_registry_module.h" #include "wasm/transform_module.h" +#include "wasm/transform_probe.h" #include "wasm/wasi.h" #include @@ -102,6 +103,8 @@ class wasmtime_runtime : public runtime { const heap_allocator* heap_allocator() const; + engine_probe_cache* engine_probe_cache(); + private: void register_metrics(); @@ -133,6 +136,7 @@ class wasmtime_runtime : public runtime { ss::sharded _stack_allocator; size_t _total_executable_memory = 0; metrics::public_metric_groups _public_metrics; + ss::sharded _engine_probe_cache; }; void check_error(const wasmtime_error_t* error) { @@ -358,6 +362,7 @@ class wasmtime_engine : public engine { : _runtime(runtime) , _meta(std::move(metadata)) , _preinitialized(std::move(preinitialized)) + , _probe(runtime->engine_probe_cache()->make_probe(_meta.name())) , _sr_module(sr) , _wasi_module({_meta.name()}, make_environment_vars(_meta), logger) , _transform_module(&_wasi_module) {} @@ -367,30 +372,12 @@ class wasmtime_engine : public engine { wasmtime_engine& operator=(wasmtime_engine&&) = delete; ~wasmtime_engine() override = default; - uint64_t memory_usage_size_bytes() const final { - if (!_store) { - return 0; - } - std::string_view memory_export_name = "memory"; - auto* ctx = wasmtime_store_context(_store.get()); - wasmtime_extern_t memory_extern; - bool ok = wasmtime_instance_export_get( - ctx, - &_instance, - memory_export_name.data(), - memory_export_name.size(), - &memory_extern); - if (!ok || memory_extern.kind != WASMTIME_EXTERN_MEMORY) { - return 0; - } - return wasmtime_memory_data_size(ctx, &memory_extern.of.memory); - }; - ss::future<> start() final { _transform_module.start(); co_await create_instance(); _main_task = initialize_wasi(); co_await _transform_module.await_ready(); + report_memory_usage(); } ss::future<> stop() final { @@ -405,7 +392,8 @@ class wasmtime_engine : public engine { !_pending_host_function, "pending host functions should be awaited upon before stopping the " "engine"); - co_return; + _probe.report_max_memory(0); + _probe.report_memory_usage(0); } ss::future @@ -420,6 +408,7 @@ class wasmtime_engine : public engine { } ss::future fut = co_await ss::coroutine::as_future( invoke_transform(std::move(batch), probe)); + report_memory_usage(); if (fut.failed()) { probe->transform_error(); std::rethrow_exception(fut.get_exception()); @@ -448,6 +437,29 @@ class wasmtime_engine : public engine { } private: + uint64_t memory_usage_size_bytes() const { + if (!_store) { + return 0; + } + std::string_view memory_export_name = "memory"; + auto* ctx = wasmtime_store_context(_store.get()); + wasmtime_extern_t memory_extern; + bool ok = wasmtime_instance_export_get( + ctx, + &_instance, + memory_export_name.data(), + memory_export_name.size(), + &memory_extern); + if (!ok || memory_extern.kind != WASMTIME_EXTERN_MEMORY) { + return 0; + } + return wasmtime_memory_data_size(ctx, &memory_extern.of.memory); + }; + + void report_memory_usage() { + _probe.report_memory_usage(memory_usage_size_bytes()); + } + void reset_fuel(wasmtime_context_t* ctx) { handle error( wasmtime_context_set_fuel(ctx, fuel_amount)); @@ -468,13 +480,16 @@ class wasmtime_engine : public engine { // We only ever create a single instance within this store, and we // expect that modules only have a single table and a single memory // instance declared. + uint32_t max_memory_size = _runtime->heap_allocator()->max_size(); wasmtime_store_limiter( store.get(), - /*memory_size=*/int64_t(_runtime->heap_allocator()->max_size()), + /*memory_size=*/max_memory_size, /*table_elements=*/max_table_elements, /*instances=*/1, /*tables=*/1, /*memories=*/1); + _probe.report_max_memory(max_memory_size); + _probe.report_memory_usage(0); auto* context = wasmtime_store_context(store.get()); wasmtime_context_fuel_async_yield_interval( @@ -643,6 +658,7 @@ class wasmtime_engine : public engine { wasmtime_runtime* _runtime; model::transform_metadata _meta; ss::foreign_ptr> _preinitialized; + engine_probe _probe; schema_registry_module _sr_module; wasi::preview1_module _wasi_module; @@ -1217,16 +1233,17 @@ ss::future<> wasmtime_runtime::start(runtime::config c) { sigaddset(&mask, SIGFPE); ss::throw_pthread_error(::pthread_sigmask(SIG_UNBLOCK, &mask, nullptr)); }); + co_await _engine_probe_cache.start(); register_metrics(); } void wasmtime_runtime::register_metrics() { namespace sm = ss::metrics; _public_metrics.add_group( - prometheus_sanitize::metrics_name("wasm"), + prometheus_sanitize::metrics_name("wasm_binary"), { sm::make_gauge( - "binary_executable_memory_usage", + "executable_memory_usage", [this] { return _total_executable_memory; }, sm::description("The amount of executable memory used for " "WebAssembly binaries")) @@ -1236,6 +1253,7 @@ void wasmtime_runtime::register_metrics() { ss::future<> wasmtime_runtime::stop() { _public_metrics.clear(); + co_await _engine_probe_cache.stop(); co_await _alien_thread.stop(); co_await _heap_allocator.stop(); co_await _stack_allocator.stop(); @@ -1302,6 +1320,9 @@ wasm_engine_t* wasmtime_runtime::engine() const { return _engine.get(); } const heap_allocator* wasmtime_runtime::heap_allocator() const { return &_heap_allocator.local(); } +engine_probe_cache* wasmtime_runtime::engine_probe_cache() { + return &_engine_probe_cache.local(); +} wasmtime_error_t* wasmtime_runtime::allocate_stack_memory( void* env, size_t size, wasmtime_stack_memory_t* memory_ret) {