Skip to content

Commit

Permalink
Merge pull request #14604 from rockwotj/wasm-probe
Browse files Browse the repository at this point in the history
wasm: introduce engine probe
  • Loading branch information
rockwotj authored Nov 2, 2023
2 parents 337d00b + cd5e00c commit 88f913e
Show file tree
Hide file tree
Showing 18 changed files with 376 additions and 52 deletions.
2 changes: 1 addition & 1 deletion src/v/transform/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#pragma once

#include "model/transform.h"
#include "wasm/probe.h"
#include "wasm/transform_probe.h"

#include <absl/container/flat_hash_map.h>

Expand Down
5 changes: 0 additions & 5 deletions src/v/transform/tests/test_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ ss::future<> fake_source::push_batch(model::record_batch batch) {
_cond_var.broadcast();
co_return;
}

uint64_t fake_wasm_engine::memory_usage_size_bytes() const {
// NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers)
return 64_KiB;
};
ss::future<> fake_wasm_engine::start() { return ss::now(); }
ss::future<> fake_wasm_engine::stop() { return ss::now(); }

Expand Down
4 changes: 1 addition & 3 deletions src/v/transform/tests/test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "units.h"
#include "utils/notification_list.h"
#include "wasm/api.h"
#include "wasm/probe.h"
#include "wasm/transform_probe.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/condition-variable.hh>
Expand Down Expand Up @@ -46,8 +46,6 @@ class fake_wasm_engine : public wasm::engine {

ss::future<> start() override;
ss::future<> stop() override;

uint64_t memory_usage_size_bytes() const override;
};

class fake_source : public source {
Expand Down
3 changes: 2 additions & 1 deletion src/v/wasm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ v_cc_library(
api.cc
ffi.cc
logger.cc
probe.cc
transform_probe.cc
schema_registry.cc
schema_registry_module.cc
transform_module.cc
wasi.cc
wasmtime.cc
cache.cc
allocator.cc
engine_probe.cc
DEPS
wasmtime
v::storage
Expand Down
2 changes: 0 additions & 2 deletions src/v/wasm/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ class engine {
virtual ss::future<> start() = 0;
virtual ss::future<> stop() = 0;

virtual uint64_t memory_usage_size_bytes() const = 0;

engine() = default;
virtual ~engine() = default;
engine(const engine&) = delete;
Expand Down
4 changes: 0 additions & 4 deletions src/v/wasm/cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ class shared_engine
}
}

uint64_t memory_usage_size_bytes() const override {
return _underlying->memory_usage_size_bytes();
}

private:
mutex _mu;
size_t _ref_count = 0;
Expand Down
101 changes: 101 additions & 0 deletions src/v/wasm/engine_probe.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2023 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "wasm/engine_probe.h"

#include "metrics/metrics.h"
#include "prometheus/prometheus_sanitize.h"

#include <seastar/core/shared_ptr.hh>
#include <seastar/core/weak_ptr.hh>

namespace wasm {

namespace internal {
engine_probe_impl::engine_probe_impl(
engine_probe_cache* cache, ss::sstring name)
: _name(std::move(name))
, _cache(cache) {
namespace sm = ss::metrics;

auto name_label = sm::label("function_name");
const std::vector<sm::label_instance> labels = {
name_label(_name),
};
_public_metrics.add_group(
prometheus_sanitize::metrics_name("wasm_engine"),
{
sm::make_gauge(
"memory_usage",
sm::description("Amount of memory usage for a WebAssembly function"),
labels,
[this] { return _memory_usage; })
.aggregate({ss::metrics::shard_label}),
sm::make_gauge(
"max_memory",
[this] { return _max_memory; },
sm::description("Max amount of memory for a WebAssembly function"),
labels)
.aggregate({ss::metrics::shard_label}),
});
}
engine_probe_impl::~engine_probe_impl() {
// Remove from cache when deleted.
_cache->remove_probe(_name);
}
void engine_probe_impl::report_memory_usage_delta(int64_t delta) {
_memory_usage += delta;
}

void engine_probe_impl::report_max_memory_delta(int64_t delta) {
_max_memory += delta;
}
} // namespace internal

engine_probe::engine_probe(ss::lw_shared_ptr<internal::engine_probe_impl> impl)
: _impl(std::move(impl)) {}

engine_probe::~engine_probe() {
if (!_impl) {
return;
}
_impl->report_max_memory_delta(-int64_t(_last_reported_max_memory));
_impl->report_memory_usage_delta(-int64_t(_last_reported_memory_usage));
}

void engine_probe::report_memory_usage(uint32_t usage) {
int64_t delta = int64_t(usage) - int64_t(_last_reported_memory_usage);
_impl->report_memory_usage_delta(delta);
_last_reported_memory_usage = usage;
}

void engine_probe::report_max_memory(uint32_t max) {
int64_t delta = int64_t(max) - int64_t(_last_reported_max_memory);
_impl->report_max_memory_delta(delta);
_last_reported_max_memory = max;
}

engine_probe engine_probe_cache::make_probe(const ss::sstring& name) {
internal::engine_probe_impl*& probe = _probe_cache[name];
if (probe) {
return engine_probe(probe->shared_from_this());
}
auto probe_impl = ss::make_lw_shared<internal::engine_probe_impl>(
this, name);
probe = probe_impl.get();
return engine_probe(probe_impl);
}

void engine_probe_cache::remove_probe(const ss::sstring& name) {
vassert(
_probe_cache.erase(name) > 0, "wasm engine probe cache inconsistency");
}
} // namespace wasm
97 changes: 97 additions & 0 deletions src/v/wasm/engine_probe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2023 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "metrics/metrics.h"
#include "seastarx.h"

#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/weak_ptr.hh>

#include <absl/container/btree_map.h>

namespace wasm {

class engine_probe_cache;

namespace internal {
class engine_probe_impl
: public ss::enable_lw_shared_from_this<engine_probe_impl> {
public:
engine_probe_impl(engine_probe_cache* cache, ss::sstring name);
engine_probe_impl(const engine_probe_impl&) = delete;
engine_probe_impl(engine_probe_impl&&) = delete;
engine_probe_impl& operator=(const engine_probe_impl&) = delete;
engine_probe_impl& operator=(engine_probe_impl&&) = delete;
~engine_probe_impl();
void report_memory_usage_delta(int64_t);
void report_max_memory_delta(int64_t);

private:
ss::sstring _name;
engine_probe_cache* _cache;
int64_t _memory_usage = 0;
int64_t _max_memory = 0;
metrics::public_metric_groups _public_metrics;
};
} // namespace internal

/**
* A probe for all types of wasm engines.
*
* Used to track things like memory and CPU usage.
*/
class engine_probe {
public:
explicit engine_probe(ss::lw_shared_ptr<internal::engine_probe_impl> impl);
engine_probe(const engine_probe&) = delete;
engine_probe& operator=(const engine_probe&) = delete;
engine_probe(engine_probe&&) = default;
engine_probe& operator=(engine_probe&&) = default;
~engine_probe();

void report_memory_usage(uint32_t);
void report_max_memory(uint32_t);

private:
ss::lw_shared_ptr<internal::engine_probe_impl> _impl;
uint32_t _last_reported_memory_usage = 0;
uint32_t _last_reported_max_memory = 0;
};

/**
* A cache for managing probes for all engines on a core.
*
* Since during deployments it's possible to have multiple versions of an engine
* with the same name, we need to ensure there are not conflicting probes by
* caching and reusing them.
*
* This cache must outlive all probes it creates and should have application
* scoped lifetimes.
*/
class engine_probe_cache {
public:
/**
* Create an engine probe under a given name.
*/
engine_probe make_probe(const ss::sstring&);

private:
friend internal::engine_probe_impl;

void remove_probe(const ss::sstring&);

absl::btree_map<ss::sstring, internal::engine_probe_impl*> _probe_cache;
};

} // namespace wasm
13 changes: 13 additions & 0 deletions src/v/wasm/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ rp_test(
LABELS wasm
)

rp_test(
UNIT_TEST
GTEST
BINARY_NAME wasm_probe
SOURCES
wasm_probe_test.cc
LIBRARIES
v::gtest_main
v::wasm
ARGS "-- -c 1"
LABELS wasm
)

rp_test(
UNIT_TEST
GTEST
Expand Down
2 changes: 0 additions & 2 deletions src/v/wasm/tests/wasm_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ class fake_engine : public engine {
co_return batch;
}

uint64_t memory_usage_size_bytes() const override { return 0; }

private:
bool _has_been_stopped = false;
state* _state;
Expand Down
1 change: 0 additions & 1 deletion src/v/wasm/tests/wasm_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "storage/parser_utils.h"
#include "storage/record_batch_builder.h"
#include "wasm/api.h"
#include "wasm/probe.h"
#include "wasm/wasmtime.h"

#include <seastar/util/file.hh>
Expand Down
2 changes: 1 addition & 1 deletion src/v/wasm/tests/wasm_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
#include "serde/rw/scalar.h"
#include "test_utils/test.h"
#include "wasm/api.h"
#include "wasm/probe.h"
#include "wasm/schema_registry.h"
#include "wasm/transform_probe.h"

#include <gtest/gtest.h>

Expand Down
Loading

0 comments on commit 88f913e

Please sign in to comment.