Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: Allow plugins to publish and subscribe to custom notifications #4496

Merged
merged 17 commits into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,11 @@ check-protos: $(ALL_PROGRAMS)
ifeq ($(PYTEST),)
@echo "py.test is required to run the protocol tests, please install using 'pip3 install -r requirements.txt', and rerun 'configure'."; false
else
ifeq ($(DEVELOPER),1)
@(cd external/lnprototest && PYTHONPATH=$(PYTHONPATH) LIGHTNING_SRC=../.. $(PYTEST) --runner lnprototest.clightning.Runner $(PYTEST_OPTS))
else
@echo "lnprototest target requires DEVELOPER=1, skipping"
endif
endif

pytest: $(ALL_PROGRAMS)
Expand Down
9 changes: 9 additions & 0 deletions contrib/pyln-client/pyln/client/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ def __init__(self, stdout: Optional[io.TextIOBase] = None,
}

self.options: Dict[str, Dict[str, Any]] = {}
self.notification_topics: List[str] = []

def convert_featurebits(
bits: Optional[Union[int, str, bytes]]) -> Optional[str]:
Expand Down Expand Up @@ -420,6 +421,11 @@ def add_flag_option(self, name: str, description: str,
self.add_option(name, None, description, opt_type="flag",
deprecated=deprecated)

def add_notification_topic(self, topic: str):
"""Announce that the plugin will emit notifications for the topic.
"""
self.notification_topics.append(topic)

def get_option(self, name: str) -> str:
if name not in self.options:
raise ValueError("No option with name {} registered".format(name))
Expand Down Expand Up @@ -898,6 +904,9 @@ def _getmanifest(self, **kwargs) -> JSONType:
'subscriptions': list(self.subscriptions.keys()),
'hooks': hooks,
'dynamic': self.dynamic,
'notifications': [
{"method": name} for name in self.notification_topics
],
}

# Compact the features a bit, not important.
Expand Down
66 changes: 66 additions & 0 deletions doc/PLUGINS.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ example:
"init": "0E000000",
"invoice": "00AD0000"
},
"notifications": [
{
"method": "mycustomnotification"
}
],
"dynamic": true
}
```
Expand Down Expand Up @@ -147,6 +152,13 @@ position bits for experiments. If you'd like to standardize your extension
please reach out to the [specification repository][spec] to get a featurebit
assigned.

The `notifications` array allows plugins to announce which custom
notifications they intend to send to `lightningd`. These custom
notifications can then be subscribed to by other plugins, allowing
them to communicate with each other via the existing publish-subscribe
mechanism and react to events that happen in other plugins, or collect
information based on the notification topics.

Plugins are free to register any `name` for their `rpcmethod` as long
as the name was not previously registered. This includes both built-in
methods, such as `help` and `getinfo`, as well as methods registered
Expand Down Expand Up @@ -206,6 +218,60 @@ Here's an example option set, as sent in response to `getmanifest`
],
```

#### Custom notifications

The plugins may emit custom notifications for topics they have
announced during startup. The list of notification topics declared
during startup must include all topics that may be emitted, in order
to verify that all topics plugins subscribe to are also emitted by
some other plugin, and warn if a plugin subscribes to a non-existent
topic. In case a plugin emits notifications it has not announced the
notification will be ignored and not forwarded to subscribers.

When forwarding a custom notification `lightningd` will wrap the
payload of the notification in an object that contains metadata about
the notification. The following is an example of this
transformation. The first listing is the original notification emitted
by the `sender` plugin, while the second is the the notification as
received by the `receiver` plugin (both listings show the full
[JSON-RPC][jsonrpc-spec] notification to illustrate the wrapping).

```json
{
"jsonrpc": "2.0",
"method": "mycustomnotification",
"params": {
"key": "value",
"message": "Hello fellow plugin!"
}
}
```

is delivered as

```json
{
"jsonrpc": "2.0",
"method": "mycustomnotification",
"params": {
"origin": "sender",
"payload": {
"key": "value",
"message": "Hello fellow plugin!"
}
}
}

```

The notification topic (`method` in the JSON-RPC message) must not
match one of the internal events in order to prevent breaking
subscribers that expect the existing notification format. Multiple
plugins are allowed to emit notifications for the same topics,
allowing things like metric aggregators where the aggregator
subscribes to a common topic and other plugins publish metrics as
notifications.

### The `init` method

The `init` method is required so that `lightningd` can pass back the
Expand Down
18 changes: 16 additions & 2 deletions lightningd/notification.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,26 @@ static struct notification *find_notification_by_topic(const char* topic)
return NULL;
}

bool notifications_have_topic(const char *topic)
bool notifications_topic_is_native(const char *topic)
{
struct notification *noti = find_notification_by_topic(topic);
if (noti)
return noti != NULL;
}

bool notifications_have_topic(const struct plugins *plugins, const char *topic)
{
struct plugin *plugin;
if (notifications_topic_is_native(topic))
return true;

/* Some plugin at some point announced it'd be emitting
* notifications to this topic. */
list_for_each(&plugins->plugins, plugin, list) {
for (size_t i = 0; i < tal_count(plugin->notification_topics); i++)
if (streq(plugin->notification_topics[i], topic))
return true;
}

return false;
}

Expand Down
6 changes: 5 additions & 1 deletion lightningd/notification.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
struct onionreply;
struct wally_psbt;

bool notifications_have_topic(const char *topic);
bool notifications_have_topic(const struct plugins *plugins, const char *topic);

/* Is the provided notification topic native, i.e., provided by
* lightningd itself? */
bool notifications_topic_is_native(const char *topic);

struct notification {
const char *topic;
Expand Down
135 changes: 116 additions & 19 deletions lightningd/plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,29 @@ void plugins_free(struct plugins *plugins)
tal_free(plugins);
}

/* Check that all the plugin's subscriptions are actually for known
* notification topics. Emit a warning if that's not the case, but
* don't kill the plugin. */
static void plugin_check_subscriptions(struct plugins *plugins,
struct plugin *plugin)
{
if (plugin->subscriptions == NULL)
return;

for (size_t i = 0; i < tal_count(plugin->subscriptions); i++) {
Comment on lines +112 to +115
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: tal_count(NULL) == 0, so this loop doesn't need a precursor check.

const char *topic = plugin->subscriptions[i];
if (!notifications_have_topic(plugins, topic))
log_unusual(
plugin->log,
"topic '%s' is not a known notification topic",
topic);
}
}

/* Once they've all replied with their manifests, we can order them. */
static void check_plugins_manifests(struct plugins *plugins)
{
struct plugin *plugin;
struct plugin **depfail;

if (plugins_any_in_state(plugins, AWAITING_GETMANIFEST_RESPONSE))
Expand All @@ -121,6 +141,12 @@ static void check_plugins_manifests(struct plugins *plugins)
"Cannot meet required hook dependencies");
}

/* Check that all the subscriptions are matched with real
* topics. */
list_for_each(&plugins->plugins, plugin, list) {
plugin_check_subscriptions(plugin->plugins, plugin);
}

/* As startup, we break out once all getmanifest are returned */
if (plugins->startup)
io_break(plugins);
Expand Down Expand Up @@ -214,17 +240,18 @@ struct plugin *plugin_register(struct plugins *plugins, const char* path TAKES,
p = tal(plugins, struct plugin);
p->plugins = plugins;
p->cmd = tal_strdup(p, path);
p->shortname = path_basename(p, p->cmd);
p->start_cmd = start_cmd;

p->plugin_state = UNCONFIGURED;
p->js_arr = tal_arr(p, struct json_stream *, 0);
p->used = 0;
p->notification_topics = tal_arr(p, const char *, 0);
p->subscriptions = NULL;
p->dynamic = false;
p->index = plugins->plugin_idx++;

p->log = new_log(p, plugins->log_book, NULL, "plugin-%s",
path_basename(tmpctx, p->cmd));
p->log = new_log(p, plugins->log_book, NULL, "plugin-%s", p->shortname);
p->methods = tal_arr(p, const char *, 0);
list_head_init(&p->plugin_opts);

Expand Down Expand Up @@ -390,6 +417,18 @@ static const char *plugin_notify_handle(struct plugin *plugin,
return NULL;
}

/* Check if the plugin is allowed to send a notification of the
* specified topic, i.e., whether the plugin has announced the topic
* correctly in its manifest. */
static bool plugin_notification_allowed(const struct plugin *plugin, const char *topic)
{
for (size_t i=0; i<tal_count(plugin->notification_topics); i++)
if (streq(plugin->notification_topics[i], topic))
return true;

return false;
}

/* Returns the error string, or NULL */
static const char *plugin_notification_handle(struct plugin *plugin,
const jsmntok_t *toks)
Expand All @@ -399,7 +438,8 @@ static const char *plugin_notification_handle(struct plugin *plugin,
const jsmntok_t *toks)
{
const jsmntok_t *methtok, *paramstok;

const char *methname;
struct jsonrpc_notification *n;
methtok = json_get_member(plugin->buffer, toks, "method");
paramstok = json_get_member(plugin->buffer, toks, "params");

Expand All @@ -420,11 +460,25 @@ static const char *plugin_notification_handle(struct plugin *plugin,
} else if (json_tok_streq(plugin->buffer, methtok, "message")
|| json_tok_streq(plugin->buffer, methtok, "progress")) {
return plugin_notify_handle(plugin, methtok, paramstok);
} else {
return tal_fmt(plugin, "Unknown notification method %.*s",
json_tok_full_len(methtok),
json_tok_full(plugin->buffer, methtok));
}

methname = json_strdup(tmpctx, plugin->buffer, methtok);

if (!plugin_notification_allowed(plugin, methname)) {
log_unusual(plugin->log,
"Plugin attempted to send a notification to topic "
"\"%s\" it hasn't declared in its manifest, not "
"forwarding to subscribers.",
methname);
} else if (notifications_have_topic(plugin->plugins, methname)) {
n = jsonrpc_notification_start(NULL, methname);
json_add_string(n->stream, "origin", plugin->shortname);
json_add_tok(n->stream, "payload", paramstok, plugin->buffer);
jsonrpc_notification_end(n);

plugins_notify(plugin->plugins, take(n));
}
return NULL;
}

/* Returns the error string, or NULL */
Expand Down Expand Up @@ -1136,14 +1190,12 @@ static const char *plugin_subscriptions_add(struct plugin *plugin,
json_tok_full_len(s),
json_tok_full(buffer, s));
}
topic = json_strdup(plugin, plugin->buffer, s);

if (!notifications_have_topic(topic)) {
return tal_fmt(
plugin,
"topic '%s' is not a known notification topic", topic);
}

/* We add all subscriptions while parsing the
* manifest, without checking that they exist, since
* later plugins may also emit notifications of custom
* types that we don't know about yet. */
topic = json_strdup(plugin, plugin->buffer, s);
tal_arr_expand(&plugin->subscriptions, topic);
}
return NULL;
Expand Down Expand Up @@ -1248,6 +1300,52 @@ static void plugin_manifest_timeout(struct plugin *plugin)
fatal("Can't recover from plugin failure, terminating.");
}

static const char *plugin_notifications_add(const char *buffer,
const jsmntok_t *result,
struct plugin *plugin)
{
char *name;
const jsmntok_t *method, *obj;
const jsmntok_t *notifications =
json_get_member(buffer, result, "notifications");

if (!notifications)
return NULL;

if (notifications->type != JSMN_ARRAY)
return tal_fmt(plugin,
"\"result.notifications\" is not an array");

for (size_t i = 0; i < notifications->size; i++) {
obj = json_get_arr(notifications, i);
Comment on lines +1319 to +1320
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json_for_each_arr(i, obj, notifications) does these two lines, BTW (and slightly more efficiently).

if (obj->type != JSMN_OBJECT)
return tal_fmt(
plugin,
"\"result.notifications[%zu]\" is not an object",
i);

method = json_get_member(buffer, obj, "method");
if (method == NULL || method->type != JSMN_STRING)
return tal_fmt(plugin,
"\"result.notifications[%zu].name\" "
"missing or not a string.",
i);

name = json_strdup(plugin, buffer, method);

if (notifications_topic_is_native(name))
return tal_fmt(plugin,
"plugin attempted to register a native "
"notification topic \"%s\", these may "
"however only be sent by lightningd",
name);

tal_arr_expand(&plugin->notification_topics, name);
}

return NULL;
}

static const char *plugin_parse_getmanifest_response(const char *buffer,
const jsmntok_t *toks,
const jsmntok_t *idtok,
Expand Down Expand Up @@ -1328,7 +1426,9 @@ static const char *plugin_parse_getmanifest_response(const char *buffer,
}
}

err = plugin_opts_add(plugin, buffer, resulttok);
err = plugin_notifications_add(buffer, resulttok, plugin);
if (!err)
err = plugin_opts_add(plugin, buffer, resulttok);
if (!err)
err = plugin_rpcmethods_add(plugin, buffer, resulttok);
if (!err)
Expand Down Expand Up @@ -1763,7 +1863,6 @@ void json_add_opt_plugins_array(struct json_stream *response,
bool important)
{
struct plugin *p;
const char *plugin_name;
struct plugin_opt *opt;
const char *opt_name;

Expand All @@ -1778,9 +1877,7 @@ void json_add_opt_plugins_array(struct json_stream *response,
json_add_string(response, "path", p->cmd);

/* FIXME: use executables basename until plugins can define their names */
plugin_name = path_basename(NULL, p->cmd);
json_add_string(response, "name", plugin_name);
tal_free(plugin_name);
json_add_string(response, "name", p->shortname);

if (!list_empty(&p->plugin_opts)) {
json_object_start(response, "options");
Expand Down
Loading