Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remote propagation support for warnings in the underlying C++/Python runtime. #5

Merged
merged 4 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions py/tuber/tuber.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
try:
import simplejson as json
except ModuleNotFoundError:
import json
import json # type: ignore[no-redef]


async def resolve(objname, hostname):
async def resolve(objname: str, hostname: str):
"""Create a local reference to a networked resource.

This is the recommended way to connect to remote tuberd instances.
Expand All @@ -36,7 +36,7 @@ async def resolve(objname, hostname):
# way to avoid carrying around global state, and requiring that state be
# consistent with whatever event loop is running in whichever context it's
# used. See https://docs.aiohttp.org/en/stable/faq.html
_clientsession = weakref.WeakKeyDictionary()
_clientsession: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary()


class TuberError(Exception):
Expand Down Expand Up @@ -150,8 +150,8 @@ async def __call__(self):
results = []
for f, r in zip(futures, json_out):
# Always emit warnings, if any occurred
if hasattr(r, "warning") and r.warning:
for w in r.warning:
if hasattr(r, "warnings") and r.warnings:
for w in r.warnings:
warnings.warn(w)

# Resolve either a result or an error
Expand Down
57 changes: 47 additions & 10 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <csignal>
#include <filesystem>
#include <chrono>
#include <queue>
#include <boost/program_options.hpp>

#include <httpserver.hpp>
Expand Down Expand Up @@ -116,6 +117,20 @@ static inline py::object error_response(std::string const& msg) {
return py::dict("error"_a=py::dict("message"_a=msg));
}

std::vector<std::string> warning_list;
static void showwarning(py::object message,
py::object category,
py::object filename,
py::object lineno,
py::object file,
py::object line) {

if(verbose & Verbose::NOISY)
fmt::print(stderr, "... captured warning '{}'\n", py::str(message).cast<std::string>());

warning_list.push_back(py::str(message).cast<std::string>());
}

static py::dict tuber_server_invoke(py::dict &registry,
py::dict const& call,
json_loads_t const& json_loads,
Expand Down Expand Up @@ -167,13 +182,23 @@ static py::dict tuber_server_invoke(py::dict &registry,

/* Dispatch to Python - failures emerge as exceptions */
timed_scope ts("Python dispatch");
py::object response = m(*python_args, **python_kwargs);
py::object response = py::none();
try {
response = py::dict("result"_a=m(*python_args, **python_kwargs));
} catch(std::exception &e) {
response = error_response(e.what());
}

/* Capture warnings, if any */
if(!warning_list.empty()) {
response["warnings"] = warning_list;
warning_list.clear();
}

if(verbose & Verbose::NOISY)
fmt::print(stderr, "... response was {}\n", json_dumps(response));

/* Cast back to JSON, wrap in a result object, and return */
return py::dict("result"_a=response);
return response;
}

if(verbose & Verbose::NOISY)
Expand Down Expand Up @@ -233,19 +258,27 @@ class DLL_LOCAL tuber_resource : public http_resource {
* list to have the expected size. */
py::list result(py::len(request_list));

for(size_t i=0; i<result.size(); i++)
bool early_bail = false;
for(size_t i=0; i<result.size(); i++) {
/* If something went wrong earlier in the loop, don't execute anything else. */
if(early_bail) {
result[i] = error_response("Something went wrong in a preceding call.");
continue;
}

try {
result[i] = tuber_server_invoke(reg, request_list[i], json_loads, json_dumps);
} catch(std::exception &e) {
/* Indicates an internal error - this does not normally happen */
result[i] = error_response(e.what());
if(verbose & Verbose::NOISY)
fmt::print("Exception path response: {}\n", json_dumps(result[i]));
early_bail = true;
}

/* Flag subsequent calls as failures, too. This also exits
* the loop without dispatching anything else. */
for(i++; i<result.size(); i++)
result[i] = error_response("Something went wrong in a preceding call.");
if(result[i].contains("error")) {
/* Indicates client code flagged an error - this is a nominal code path */
early_bail = true;
}
}

timed_scope ts("Happy-path JSON serialization");

Expand Down Expand Up @@ -395,6 +428,10 @@ int main(int argc, char **argv) {

py::scoped_interpreter python;

/* By default, capture warnings */
py::module warnings = py::module::import("warnings");
warnings.attr("showwarning") = py::cpp_function(showwarning);

/* Learn how the Python half lives */
try {
py::eval_file(preamble);
Expand Down
57 changes: 53 additions & 4 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import textwrap
import tuber
import weakref
import warnings

from requests.packages.urllib3.util.retry import Retry

Expand Down Expand Up @@ -73,13 +74,33 @@ class NumPy:
def returns_numpy_array(self):
return np.array([0, 1, 2, 3])

class WarningsClass:
def single_warning(self, warning_text, error=False):
warnings.resetwarnings() # ensure no filters
warnings.warn(warning_text)

if error:
raise RuntimeError("Oops!")

return True

def multiple_warnings(self, warning_count=1, error=False):
warnings.resetwarnings() # ensure no filters
for n in range(warning_count):
warnings.warn(f"Warning {n+1}")

if error:
raise RuntimeError("Oops!")

return True

registry = {
"NullObject": NullObject(),
"ObjectWithMethod": ObjectWithMethod(),
"ObjectWithProperty": ObjectWithProperty(),
"Types": Types(),
"NumPy": NumPy(),
"Warnings": WarningsClass(),
"Wrapper": tm.Wrapper(),
}

Expand Down Expand Up @@ -149,13 +170,19 @@ def tuber_call(json=None, **kwargs):
yield tuber_call


def Succeeded(args=None, **kwargs):
def Succeeded(args=None, warnings=None, **kwargs):
"""Wrap a return value for a successful call in its JSON-RPC wrapper"""
if warnings is not None:
return dict(result=kwargs or args, warnings=warnings)

return dict(result=kwargs or args)


def Failed(**kwargs):
def Failed(warnings=None, **kwargs):
"""Wrap a return value for an error in its JSON-RPC wrapper"""
if warnings is not None:
return dict(error=kwargs, warnings=warnings)

return dict(error=kwargs)


Expand Down Expand Up @@ -216,11 +243,13 @@ def test_function_types_with_correct_argument_types(tuber_call):
def test_numpy_types(tuber_call):
assert tuber_call(object="NumPy", method="returns_numpy_array") == Succeeded([0, 1, 2, 3])


#
# pybind11 wrappers
#

assert tuber_call(object="Types", method="string_function", args=["this is a string"]) == Succeeded(
"this is a string"
)

@pytest.mark.orjson
def test_double_vector(tuber_call):
Expand Down Expand Up @@ -356,7 +385,8 @@ async def test_tuberpy_session_cache(tuber_call):
aiohttp.ClientSession = None # break ClientSession instantiation
await s.increment([4, 5, 6])
importlib.reload(aiohttp)
assert aiohttp.ClientSession # make sure we fixed it
# ensure we fixed it.
assert aiohttp.ClientSession # type: ignore[truthy-function]


@pytest.mark.asyncio
Expand Down Expand Up @@ -428,3 +458,22 @@ async def test_tuberpy_async_context_with_unserializable(tuber_call):

with pytest.raises(tuber.TuberRemoteError):
await r3

@pytest.mark.asyncio
async def test_tuberpy_warnings(tuber_call):
"""Ensure warnings are captured"""
s = await tuber.resolve("Warnings", TUBERD_HOSTNAME)

# Single, simple warning
with pytest.warns(match="This is a warning"):
await s.single_warning("This is a warning")

# Several in a row
with pytest.warns() as ws:
await s.multiple_warnings(warning_count=5)
assert len(ws) == 5

# Check with exceptions
with pytest.raises(tuber.TuberRemoteError), \
pytest.warns(match="This is a warning"):
await s.single_warning("This is a warning", error=True)
Loading