Skip to content

Commit

Permalink
ZMQ Pub Spends (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
vtnerd authored Apr 7, 2024
1 parent 4c22059 commit 6f71cc0
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 50 deletions.
59 changes: 57 additions & 2 deletions docs/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ option. Users are still required to "subscribe" to topics:
with their new height and block hash.
* `msgpack-minimal-scanned:` A msgpack object of a list of user primary
addresses with their new height and block hash.
* `json-full-spend_hook': A JSON object of a webhook spend event that has
recently triggerd (identical output as webhook).
* `msgpack-full-spend_hook`: A msgpack object of a single new account
creation that has recently triggered (identical output as webhook).


### `json-full-payment_hook`/`msgpack-full-payment_hook`
Expand Down Expand Up @@ -100,7 +104,6 @@ where matching is done by string prefixes.

> `index` is a counter used to detect dropped messages.

### `json-minimal-scanned`/`msgpack-minimal-scanned`
These topics receive PUB messages when a thread has finished scanning 1+
accounts. The last block height and hash is sent.
Expand All @@ -115,9 +118,61 @@ json-minimal-scanned:{
"addresses": [
"9xkhhJSa7ZhS5sAcTix6ozL14RwdgxbV7JZVFW4rCghN7GidutaykfxDHfgW45UPiCTXncuvZ91GNSGgxs3b2Cin9TU8nP3"
]

> `index` is a counter used to detect dropped messages.

### `json-full-spend_hook`/`msgpack-full-spend_hook`
These topics receive PUB messages when a webhook ([`webhook_add`](administration.md)),
event is triggered for a spend (`tx-spend`). If the specified URL is
`zmq`, then notifications are only done over the ZMQ-PUB socket, otherwise the
notification is sent over ZMQ-PUB socket AND the specified URL. Invoking
`webhook_add` with a `payment_id` or `confirmation` results in a NOP because
both fields are unused for spends. This event is only triggered on
confirmation==1 (`confirmation` field on `webhook_add`s have no effect, and
mempool spends are not scanned). The intent is to notify the user of unexpected
spend operations. The end user will need to use `tx_info.input.image`,
`tx_info.source.index`, and `tx_info.source.tx_public` to determine if the
output was actually spent or being used as a decoy.

Example of the "raw" output from ZMQ-SUB side:

```json
json-full-spend_hook:{
"index": 0,
"event": {
"event": "tx-spend",
"token": "spend-xmr",
"event_id": "7ff047aa74e14f4aa978469bc0eec8ec",
"tx_info": {
"input": {
"height": 2464207,
"tx_hash": "97d4e66c4968b16fec7662adc9f8562c49108d3c5e7030c4d6dd32d97fb62540",
"image": "b0fe7acd9e17bb8b9ac2daae36d4cb607ac60ed8a101cc9b2e1f74016cf80b24",
"source": {
"high": 0,
"low": 6246316
},
"timestamp": 1711902214,
"unlock_time": 0,
"mixin_count": 15,
"sender": {
"maj_i": 0,
"min_i": 0
}
},
"source": {
"id": {
"high": 0,
"low": 6246316
},
"amount": 10000000000,
"mixin": 15,
"index": 0,
"tx_public": "426ccd6d39535a1ee8636d14978581e580fcea35c8d3843ceb32eb688a0197f7"
}
}
}
}
```

> `index` is a counter used to detect dropped messages
28 changes: 27 additions & 1 deletion src/db/data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ namespace db

namespace
{
constexpr const char* map_webhook_type[] = {"tx-confirmation", "new-account"};
constexpr const char* map_webhook_type[] = {"tx-confirmation", "new-account", "tx-spend"};

template<typename F, typename T>
void map_webhook_key(F& format, T& self)
Expand Down Expand Up @@ -423,6 +423,32 @@ namespace db
);
}

static void write_bytes(wire::writer& dest, const output::spend_meta_& self)
{
wire::object(dest,
WIRE_FIELD_ID(0, id),
wire::field<1>("amount", self.amount),
wire::field<2>("mixin", self.mixin_count),
wire::field<3>("index", self.index),
WIRE_FIELD_ID(4, tx_public)
);
}

static void write_bytes(wire::writer& dest, const webhook_tx_spend::tx_info_& self)
{
wire::object(dest, WIRE_FIELD_ID(0, input), WIRE_FIELD_ID(1, source));
}

void write_bytes(wire::writer& dest, const webhook_tx_spend& self)
{
wire::object(dest,
wire::field<0>("event", std::cref(self.key.type)),
wire::field<1>("token", std::cref(self.value.second.token)),
wire::field<2>("event_id", std::cref(self.value.first.event_id)),
WIRE_FIELD_ID(3, tx_info)
);
}

void write_bytes(wire::json_writer& dest, const webhook_event& self)
{
crypto::hash8 payment_id;
Expand Down
16 changes: 15 additions & 1 deletion src/db/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ namespace db
enum class webhook_type : std::uint8_t
{
tx_confirmation = 0, // cannot change values - stored in DB
new_account
new_account,
tx_spend
// unconfirmed_tx,
// new_block
// confirmed_tx,
Expand Down Expand Up @@ -384,6 +385,19 @@ namespace db
};
void write_bytes(wire::writer&, const webhook_tx_confirmation&);

//! Returned by DB when a webhook event "tripped"
struct webhook_tx_spend
{
webhook_key key;
webhook_value value;
struct tx_info_
{
spend input;
output::spend_meta_ source;
} tx_info;
};
void write_bytes(wire::writer&, const webhook_tx_spend&);

//! References a specific output that triggered a webhook
struct webhook_output
{
Expand Down
70 changes: 61 additions & 9 deletions src/db/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2573,18 +2573,69 @@ namespace db
}
return success();
}

expect<void> check_spends(std::vector<webhook_tx_spend>& out, MDB_cursor& webhooks_cur, MDB_cursor& outputs_cur, const lws::account& user)
{
const account_id user_id = user.id();
const webhook_key hook_key{user_id, webhook_type::tx_spend};
MDB_val key = lmdb::to_val(hook_key);
MDB_val value{};

// Find a tx_spend for user id
int err = mdb_cursor_get(&webhooks_cur, &key, &value, MDB_SET_KEY);
for (;;)
{
if (err)
{
if (err != MDB_NOTFOUND)
return {lmdb::error(err)};
break;
}

const auto hook = webhooks.get_value(value);
if (hook)
{
out.reserve(user.spends().size());
for (const spend& s : user.spends())
{
key = lmdb::to_val(user_id);
value = lmdb::to_val(s.link.height);
err = mdb_cursor_get(&outputs_cur, &key, &value, MDB_GET_BOTH_RANGE);

expect<output::spend_meta_> meta{common_error::kInvalidArgument};
for (;;)
{
if (err)
return {lmdb::error(err)};
meta = outputs.get_value<MONERO_FIELD(output, spend_meta)>(value);
if (!meta)
return meta.error();
if (meta->id == s.source)
break;
err = mdb_cursor_get(&outputs_cur, &key, &value, MDB_PREV_DUP);
}

out.push_back(
webhook_tx_spend{hook_key, *hook, {s, *meta}}
);
}
}
err = mdb_cursor_get(&webhooks_cur, &key, &value, MDB_NEXT_DUP);
} // every hook_key
return success();
}
} // anonymous

expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>> storage::update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const pow_sync> pow)
expect<storage::updated> storage::update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const pow_sync> pow)
{
if (users.empty() && chain.empty())
return {std::make_pair(0, std::vector<webhook_tx_confirmation>{})};
return {updated{}};
MONERO_PRECOND(!chain.empty());
MONERO_PRECOND(db != nullptr);
if (!pow.empty())
MONERO_PRECOND(chain.size() == pow.size());

return db->try_write([this, height, chain, users, pow] (MDB_txn& txn) -> expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>>
return db->try_write([this, height, chain, users, pow] (MDB_txn& txn) -> expect<updated>
{
epee::span<const crypto::hash> chain_copy{chain};
epee::span<const pow_sync> pow_copy{pow};
Expand All @@ -2593,7 +2644,7 @@ namespace db
const std::uint64_t first_new = lmdb::to_native(height) + 1;

// collect all .value() errors
std::pair<std::size_t, std::vector<webhook_tx_confirmation>> updated;
updated out{};
if (get_checkpoints().get_max_height() <= last_update)
{
cursor::blocks blocks_cur;
Expand Down Expand Up @@ -2652,7 +2703,7 @@ namespace db
const auto cur_block = blocks.get_value<block_info>(value);
if (!cur_block)
return cur_block.error();
// If a reorg past a checkpoint is being attempted
// If a reorg past a checkpoint is being attempted
if (chain[chain.size() - 1] != cur_block->hash)
return {error::bad_blockchain};

Expand Down Expand Up @@ -2743,13 +2794,14 @@ namespace db
MONERO_CHECK(check_hooks(*webhooks_cur, *events_cur, *user));
MONERO_CHECK(
add_ongoing_hooks(
updated.second, *webhooks_cur, *outputs_cur, *events_cur, user->id(), block_id(first_new), block_id(last_update + 1)
out.confirm_pubs, *webhooks_cur, *outputs_cur, *events_cur, user->id(), block_id(first_new), block_id(last_update + 1)
)
);
MONERO_CHECK(check_spends(out.spend_pubs, *webhooks_cur, *outputs_cur, *user));

++updated.first;
++out.accounts_updated;
} // ... for every account being updated ...
return {std::move(updated)};
return {std::move(out)};
});
}

Expand Down Expand Up @@ -2954,7 +3006,7 @@ namespace db
key.user = MONERO_UNWRAP(accounts_by_address.get_value<MONERO_FIELD(account_by_address, lookup.id)>(lmvalue));
}

if (key.user == account_id::invalid && type == webhook_type::tx_confirmation)
if (key.user == account_id::invalid && (type == webhook_type::tx_confirmation || type == webhook_type::tx_spend))
return {error::bad_webhook};

lmkey = lmdb::to_val(key);
Expand Down
15 changes: 11 additions & 4 deletions src/db/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ namespace db
expect<std::vector<account_address>>
reject_requests(request req, epee::span<const account_address> addresses);

//! Status of an `update` request
struct updated
{
std::vector<webhook_tx_spend> spend_pubs;
std::vector<webhook_tx_confirmation> confirm_pubs;
std::size_t accounts_updated;
};
/*!
Updates the status of user accounts, even if inactive or hidden. Duplicate
receives or spends provided in `accts` are silently ignored. If a gap in
Expand All @@ -274,15 +281,15 @@ namespace db
\param chain List of block hashes that `accts` were scanned against.
\param accts Updated to `height + chain.size()` scan height.
\return Number of updated accounts, and a list of webhooks that triggered.
\return Status via `updated` object.
*/
expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>>
expect<updated>
update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> accts, epee::span<const pow_sync> pow);

/*!
Adds subaddresses to an account. Upon success, an account will
immediately begin tracking them in the scanner.
\param id of the account to associate new indexes
\param addresss of the account (needed to generate subaddress publc key)
\param view_key of the account (needed to generate subaddress public key)
Expand All @@ -300,7 +307,7 @@ namespace db
/*!
Add webhook to be tracked in the database. The webhook will "call"
the specified URL with JSON/msgpack information when the event occurs.
\param type The webhook event type to be tracked by the DB.
\param address is required for `type == tx_confirmation`, and is not
not needed for all other types.
Expand Down
4 changes: 4 additions & 0 deletions src/rpc/admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ namespace lws { namespace rpc
if (req.address)
return {error::bad_webhook};
break;
case db::webhook_type::tx_spend:
if (!req.address)
return {error::bad_webhook};
break;
default:
return {error::bad_webhook};
}
Expand Down
13 changes: 9 additions & 4 deletions src/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ namespace lws
vec.erase(vec.begin());
};

void send_spend_hook(rpc::client& client, const epee::span<const db::webhook_tx_spend> events, net::ssl_verification_t verify_mode)
{
rpc::send_webhook(client, events, "json-full-spend_hook:", "msgpack-full-spend_hook:", std::chrono::seconds{5}, verify_mode);
}

struct by_height
{
bool operator()(account const& left, account const& right) const noexcept
Expand Down Expand Up @@ -880,11 +885,11 @@ namespace lws
}

MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)");
send_payment_hook(client, epee::to_span(updated->second), opts.webhook_verify);

if (updated->first != users.size())
send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify);
send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify);
if (updated->accounts_updated != users.size())
{
MWARNING("Only updated " << updated->first << " account(s) out of " << users.size() << ", resetting");
MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting");
return;
}

Expand Down
Loading

0 comments on commit 6f71cc0

Please sign in to comment.