Skip to content

Commit

Permalink
utils: create external process class
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: NyaliaLui <[email protected]>
  • Loading branch information
michael-redpanda authored and NyaliaLui committed Aug 15, 2023
1 parent f7b02e0 commit c948847
Show file tree
Hide file tree
Showing 7 changed files with 499 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/v/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ v_cc_library(
bottomless_token_bucket.cc
utf8.cc
log_hist.cc
process.cc
DEPS
Seastar::seastar
Hdrhistogram::hdr_histogram
Expand Down
180 changes: 180 additions & 0 deletions src/v/utils/process.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright 2023 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "utils/process.h"

#include "ssx/future-util.h"
#include "vassert.h"
#include "vlog.h"

#include <seastar/core/loop.hh>
#include <seastar/core/when_all.hh>
#include <seastar/util/log.hh>

#include <boost/iostreams/device/array.hpp>
#include <boost/iostreams/stream.hpp>
#include <fmt/format.h>

#include <sstream>
#include <string>

namespace ssx {

namespace {
ss::logger proclog{"process"};

ss::future<> consume_input_stream(
ss::input_stream<char> stream, bool is_stdout, ss::abort_source& as) {
std::string_view stream_name{is_stdout ? "STDOUT" : "STDERR"};

std::string line;
while (!stream.eof()) {
if (as.abort_requested()) {
co_return;
}

auto buf = co_await stream.read();
if (buf.empty()) {
continue;
}

boost::iostreams::stream<boost::iostreams::basic_array_source<char>> ss{
buf.begin(), buf.end()};
while (!ss.eof()) {
std::getline(ss, line);
vlog(proclog.trace, "{}: {}", stream_name, line);
}
}
}

// Unwraps the result of a Seastar process
process::exit_status_t
unwrap_wait_status(ss::experimental::process::wait_status& result) {
return ss::visit(
result,
[](ss::experimental::process::wait_exited exited) {
return process::exit_status_t{
.exit_int = exited.exit_code, .exit_reason = "exit code"};
},
[](ss::experimental::process::wait_signaled signaled) {
return process::exit_status_t{
.exit_int = signaled.terminating_signal,
.exit_reason = "exit signal"};
});
}
} // namespace

process::~process() {
_as.request_abort();
vassert(
!is_running(),
"Processes must exit before destruction: cmd {}",
_cmd_str);
}

ss::future<std::error_code> process::spawn(
const std::filesystem::path& cmd, ss::experimental::spawn_parameters params) {
gate_guard g{_gate};
if (is_running()) {
vlog(proclog.error, "A process is already running: cmd {}", _cmd_str);
co_return process_errc::running;
}

// Set command string for logging purposes
_cmd_str = cmd.string();

auto p = co_await ss::experimental::spawn_process(cmd, std::move(params));
_process.emplace(std::move(p));

// Capture output async in the background so the broker is not blocked.
ssx::background = ssx::spawn_with_gate_then(_gate, [this] {
return consume_input_stream(_process->stdout(), true, _as)
.then([this] {
return consume_input_stream(_process->stderr(), false, _as);
})
// According to seastar docs, a process that is not wait'd may leave a
// zombie behind. So we do that here.
.then([this] { return handle_wait(); })
.finally([this] { _process.reset(); });
});

co_return process_errc::success;
}

ss::future<> process::stop() { co_await _gate.close(); }

process::exit_status_t process::exit_status() {
return unwrap_wait_status(_wait_status);
}

ss::future<> process::handle_wait() {
vassert(is_running(), "_process not instantiated");

_wait_status = co_await _process->wait();
auto status = exit_status();

// There is no signal=0, so an exit_int=0 is the success exit code.
if (status.exit_int != 0) {
vlog(
proclog.error,
"Process stop fail: cmd {}, {}={}",
_cmd_str,
status.exit_reason,
status.exit_int);
} else {
vlog(proclog.trace, "Process stop success: cmd {}", _cmd_str);
}
}

ss::future<std::error_code>
process::terminate(std::chrono::milliseconds timeout) {
gate_guard g{_gate};
if (!is_running()) {
co_return process_errc::does_not_exist;
}

try {
_process->terminate();
} catch (const std::system_error&) {
// The process is already dead
co_return process_errc::does_not_exist;
}

auto deadline = ss::steady_clock_type::now() + timeout;

co_return co_await ss::with_timeout(
deadline,
ss::do_until(
[this, deadline] {
// The callback (do_until loop) is not auto canceled on timeout,
// so exit when the deadline is exceeded.
if (ss::steady_clock_type::now() > deadline) {
return true;
}

return !is_running();
},
[] { return ss::sleep(std::chrono::milliseconds{5}); }))
.then([] {
return ss::make_ready_future<process_errc>(process_errc::success);
})
.handle_exception_type([this](const ss::timed_out_error&) {
_process->kill();
return ss::make_ready_future<process_errc>(process_errc::success);
})
.handle_exception_type([](const std::system_error&) {
// The process is already dead
return ss::make_ready_future<process_errc>(
process_errc::does_not_exist);
});
}

} // namespace ssx
135 changes: 135 additions & 0 deletions src/v/utils/process.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "gate_guard.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/with_timeout.hh>
#include <seastar/util/process.hh>

#include <chrono>
#include <filesystem>
#include <system_error>
#include <utility>
#include <vector>

namespace ssx {

enum class process_errc : int16_t {
success = 0, // must be 0
does_not_exist,
running,
non_zero_exit,
signaled
};

struct process_errc_category final : public std::error_category {
const char* name() const noexcept final { return "process::errc"; }

std::string message(int c) const final {
switch (static_cast<process_errc>(c)) {
case process_errc::success:
return "Success";
case process_errc::does_not_exist:
return "Process does not exist";
case process_errc::running:
return "Process is already running";
case process_errc::non_zero_exit:
return "Process exited with non-zero status";
case process_errc::signaled:
return "Process exited on signal";
}
return "process::process_errc::unknown";
}
};
inline const std::error_category& error_category() noexcept {
static process_errc_category e;
return e;
}
inline std::error_code make_error_code(process_errc e) noexcept {
return std::error_code(static_cast<int>(e), error_category());
}

/*
* \brief Manages a POSIX fork that runs on one core
*
* Seastar supports running external processes via a POSIX fork with
* ss::experimental::spawn_process. This class does not spawn two or more forks
* within the same instance. The process runs on one core and it is easy to
* misuse. Therefore, this wrapper is responsible for:
* 1. Starting a process
* 2. Killing a process
* 3. Waiting for the process to finish
*
* Example:
* process p
* p.spawn()
* while p.is_running():
* do_other_work()
* status = p.exit_status()
* log(status.exit_int, status.exit_reason)
*/
class process {
public:
// A struct to represent the result of a process
struct exit_status_t {
int exit_int;
ss::sstring exit_reason;
};

process()
: _process{std::nullopt}
, _wait_status{ss::experimental::process::wait_exited{-1}} {}
process(const process&) = delete;
~process();

// Spawns a posix fork and starts running the command.
//
// Output from stdout and stderr are logged at TRACE level.
ss::future<std::error_code> spawn(
const std::filesystem::path& cmd,
ss::experimental::spawn_parameters params);

ss::future<> stop();

// Terminate a running process
//
// /param: timeout: Amount of time to wait for the process to respond to
// SIGTERM. SIGKILL is sent after timeout. Default 1s.
ss::future<std::error_code>
terminate(std::chrono::milliseconds timeout = std::chrono::seconds(1));

// Unwraps the result of a process into an integer for the exit code/signal
// and a string describing what caused the exit.
exit_status_t exit_status();
bool is_running() const { return _process.has_value(); }

private:
ss::sstring _cmd_str;
std::optional<ss::experimental::process> _process;
ss::experimental::process::wait_status _wait_status;
ss::gate _gate;
ss::abort_source _as;

// Wait for the running process to finish
ss::future<> handle_wait();
};

} // namespace ssx

namespace std {
template<>
struct is_error_code_enum<ssx::process_errc> : true_type {};
} // namespace std
6 changes: 6 additions & 0 deletions src/v/utils/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set(HANDLE_SIGTERM_SCRIPT "${CMAKE_CURRENT_SOURCE_DIR}/handle-sigterm.sh")
set(TIMED_LOOP_SCRIPT "${CMAKE_CURRENT_SOURCE_DIR}/timed-loop.sh")

rp_test(
UNIT_TEST
Expand All @@ -18,9 +20,13 @@ rp_test(
uuid_test.cc
vint_test.cc
waiter_queue_test.cc
process_spawn_tests.cc
LIBRARIES v::seastar_testing_main v::utils v::bytes absl::flat_hash_set
ARGS "-- -c 1"
LABELS utils
ENV
"HANDLE_SIGTERM_SCRIPT=${HANDLE_SIGTERM_SCRIPT}"
"TIMED_LOOP_SCRIPT=${TIMED_LOOP_SCRIPT}"
)

rp_test(
Expand Down
7 changes: 7 additions & 0 deletions src/v/utils/tests/handle-sigterm.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash

trap 'echo "sigterm called"' SIGTERM

while true; do
:
done
Loading

0 comments on commit c948847

Please sign in to comment.