Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

RPC Block Subscription #21787

Merged
merged 6 commits into from
Dec 17, 2021
Merged

RPC Block Subscription #21787

merged 6 commits into from
Dec 17, 2021

Conversation

segfaultdoc
Copy link
Contributor

Problem

Wintermute and other HFTs would like a way to subscribe to an account (i.e. serum token account, serum market etc.) and receive transactions that mention the specified account per block.

Summary of Changes

This adds a blockSubscribe websocket method. Example usages:

  1. {"jsonrpc": "2.0", "id": "1", "method": "blockSubscribe", "params": ["all"]}
  • returns all txs
  1. {"jsonrpc": "2.0", "id": "1", "method": "blockSubscribe", "params": [{"mentionsAccountOrProgram": "pubkey"}]}
  • returns all txs mentioning pubkey with commitment level of finalized
  1. {"jsonrpc": "2.0", "id": "1", "method": "blockSubscribe", "params": [{"mentionsAccountOrProgram": "pubkey"}, {"commitment": "confirmed"}]}
  • returns all txs mentioning pubkey with commitment level of confirmed

If no txs match the filters for a given block then no data is returned.

@mergify mergify bot added the community Community contribution label Dec 10, 2021
@mergify mergify bot requested a review from a team December 10, 2021 20:47
@codecov
Copy link

codecov bot commented Dec 11, 2021

Codecov Report

Merging #21787 (ed4dca4) into master (8d22ca5) will increase coverage by 0.1%.
The diff coverage is 94.3%.

@@            Coverage Diff            @@
##           master   #21787     +/-   ##
=========================================
+ Coverage    81.2%    81.4%   +0.1%     
=========================================
  Files         516      516             
  Lines      144284   145338   +1054     
=========================================
+ Hits       117256   118318   +1062     
+ Misses      27028    27020      -8     

Copy link
Contributor

@CriesofCarrots CriesofCarrots left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely seems useful. I think we can make this work.

I would like to see some perf testing to see how many all subscriptions can be supported without bogging down the rpc node.
Given the potential for large notification messages, do you think this subscription ought to be opt-in for nodes, like voteSubscribe?

rpc/src/rpc_subscription_tracker.rs Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
rpc/src/rpc_pubsub.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
@segfaultdoc segfaultdoc changed the title Seg lb/block stream RPC Block Subscription Dec 13, 2021
@mvines mvines added the v1.9 label Dec 13, 2021
@mvines
Copy link
Contributor

mvines commented Dec 13, 2021

(optimistically adding the v1.9 label as I think there are some that would love to see this feature sooner than in v1.10)

@segfaultdoc
Copy link
Contributor Author

segfaultdoc commented Dec 14, 2021

Definitely seems useful. I think we can make this work.

I would like to see some perf testing to see how many all subscriptions can be supported without bogging down the rpc node. Given the potential for large notification messages, do you think this subscription ought to be opt-in for nodes, like voteSubscribe?

Ran some local perf tests locally.

Setup:
Have 2 nodes running on separate boxes. Subscribe to one of the nodes blockSubscribe method with All and show_rewards. Start at 71 blockSubscribe connections and increment until 99 every 60 seconds. Then compare how many slots the node being subscribed to fell behind relative to the other node using the slotsSubscribe method.

  • num errors indicates the number of times the channel was closed by the server
  • num messages received is how many notification came through
  • slots won indicates how many slots the node running the blockSubscribe code, was ahead of the other

Screen Shot 2021-12-14 at 5 12 44 PM

Screen Shot 2021-12-14 at 5 12 51 PM

@buffalu
Copy link
Contributor

buffalu commented Dec 15, 2021

does blockstore store processed blocks? does it make sense to allow processed here if getBlock doesn't allow it?

@CriesofCarrots
Copy link
Contributor

does blockstore store processed blocks? does it make sense to allow processed here if getBlock doesn't allow it?

This won't support processed blocks until getBlock does

Copy link
Contributor

@CriesofCarrots CriesofCarrots left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff here.
I have a handful of nits, plus a few more substantive things we need to handle.
(CI failure seems unrelated)

rpc/src/rpc_pubsub.rs Outdated Show resolved Hide resolved
rpc/src/rpc_pubsub.rs Outdated Show resolved Hide resolved
rpc/src/rpc_pubsub.rs Outdated Show resolved Hide resolved
rpc/src/rpc_pubsub.rs Outdated Show resolved Hide resolved
rpc/src/rpc_pubsub.rs Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
@segfaultdoc
Copy link
Contributor Author

does blockstore store processed blocks? does it make sense to allow processed here if getBlock doesn't allow it?

This won't support processed blocks until getBlock does

It's my understanding that WindowService inserts shreds into the Blockstore as they arrive. Is my assumption that these shreds eventually compose a 'Processed' block incorrect?

@buffalu
Copy link
Contributor

buffalu commented Dec 15, 2021

I would like to see some perf testing to see how many all subscriptions can be supported without bogging down the rpc node. Given the potential for large notification messages, do you think this subscription ought to be opt-in for nodes, like voteSubscribe?

for "all" it looks like we'll hit the NIC limit before we bog down the node lol. we started running tests on our laptop, but wasn't reliable, so we moved it onto the ax161.

this is running from a few commits ago, but seems like no large perf changes since then.

test setup:
bare server: hetzner ax101
block stream server: hetzner ax161

script:

  • 2x threads. connects to ax101 and ax161 with slot subscribe. every time new slot comes in, writes to CSV row.
  • thread that connects to ax161. every 10s, spawns a new block subscription. starts at 0, goes to 100. logs every time it adds a new connection.

block subscription params:

fn new_block_sub_client(ws_url: String) -> BlockSubscription {
    PubsubClient::block_subscribe(
        &ws_url,
        RpcBlockSubscribeFilter::All,
        Some(RpcBlockSubscribeConfig {
            commitment: Some(CommitmentConfig {
                commitment: CommitmentLevel::Confirmed,
            }),
            encoding: Some(UiTransactionEncoding::Base64),
            transaction_details: Some(TransactionDetails::Full),
            show_rewards: Some(false),
        }),
    )
    .expect("websocket to subscribe")
}

nload around 90 connections:
Screen Shot 2021-12-15 at 12 17 53 PM

raw data:
slots.csv

Slots as function of time:
Screen Shot 2021-12-15 at 12 21 18 PM

Slot diff:
Screen Shot 2021-12-15 at 12 23 07 PM

@buffalu
Copy link
Contributor

buffalu commented Dec 15, 2021

the slot diff thing is weird, perhaps reorgs?

@CriesofCarrots
Copy link
Contributor

It's my understanding that WindowService inserts shreds into the Blockstore as they arrive. Is my assumption that these shreds eventually compose a 'Processed' block incorrect?

That's correct; it becomes "processed" when the block is complete (all shreds received) and processed by ReplayStage (frozen) and ready to be voted on.

rpc/src/rpc_pubsub.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
rpc/src/rpc_subscriptions.rs Outdated Show resolved Hide resolved
transaction-status/src/lib.rs Outdated Show resolved Hide resolved
Comment on lines 944 to 948
let ancestors = bank_forks.read().unwrap().ancestors();
let ancestors = ancestors.get(&slot);
let empty_set = &HashSet::new();
let ancestors = ancestors.unwrap_or(empty_set);
for s in *w_last_unnotified_slot..slot + 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let ancestors = bank_forks.read().unwrap().ancestors();
let ancestors = ancestors.get(&slot);
let empty_set = &HashSet::new();
let ancestors = ancestors.unwrap_or(empty_set);
for s in *w_last_unnotified_slot..slot + 1 {
let bank = bank_forks.read().unwrap().get(&slot);
let mut slots_to_notify: Vec<_> = bank.proper_ancestors().filter(|ancestor| ancestor >= *w_last_unnotified_slot).collect();
slots_to_notify.sort();
slots_to_notify.push(slot);
for s in slots_to_notify {

Sorry, I should have steered you to the Bank method instead. It should be faster, since it just reads the ancestors of one slot.

Wdyt of this approach, skipping non-ancestors in the loop entirely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I was thinking of going this route but opted out just because of time complexity. It may be negligible (maybe not?) But from a glance this would be O(n*log(n)) cause of the sort and still iterates through all ancestors in filter vs. O(1) access of the HashSet approach. Perhaps the sort can be omitted completely if we do something like:

    let mut slots_to_notify: Vec<_> = (w_last_unnotified_slot..slot).collect();
    slots_to_notify = slots_to_notify.into_iter().filter(|slot| ancestors.contains(slot)).collect();
    slots_to_notify.push(slot);

Wdyt of the three approaches?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Sorry for delay!) I think it may be negligible, but fair point. I like approach 3 without the sort the best!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just noticed proper_ancestors is only crate visible plus doesn't return HashSet, so the bank_forks function call might be better suited here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I have an idea!

@CriesofCarrots
Copy link
Contributor

I don't recognize that CI failure, incidentally. If you wouldn't mind, can you rebase on master when you push any new changes? if the failure recurs, I'll take a closer look.

Copy link
Contributor

@CriesofCarrots CriesofCarrots left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This last thing: #21787 (comment)
And then lgtm :)

Copy link
Contributor

@CriesofCarrots CriesofCarrots left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI is green :) Let's get this in, and we can iterate if needed. Thank you for all the polish, @segfaultdoc !

@buffalu
Copy link
Contributor

buffalu commented Dec 17, 2021

hell ya! lfg @segfaultdoc

@CriesofCarrots
Copy link
Contributor

CriesofCarrots commented Dec 17, 2021

Oh sorry, one last thing: in a separate PR, can you please add the new method to docs here? https://github.com/solana-labs/solana/blob/master/docs/src/developing/clients/jsonrpc-api.md (with voteSubscribe, for now)
Thank you!

@CriesofCarrots CriesofCarrots merged commit 76098dd into solana-labs:master Dec 17, 2021
mergify bot pushed a commit that referenced this pull request Dec 17, 2021
* add stuff

* compiling

* add notify block

* wip

* feat: add blockSubscribe pubsub method

* address PR comments

Co-authored-by: Lucas B <[email protected]>
Co-authored-by: Zano <[email protected]>
(cherry picked from commit 76098dd)

# Conflicts:
#	Cargo.lock
#	client-test/Cargo.toml
#	rpc/src/rpc_subscriptions.rs
mergify bot added a commit that referenced this pull request Dec 18, 2021
* RPC Block Subscription (#21787)

* add stuff

* compiling

* add notify block

* wip

* feat: add blockSubscribe pubsub method

* address PR comments

Co-authored-by: Lucas B <[email protected]>
Co-authored-by: Zano <[email protected]>
(cherry picked from commit 76098dd)

# Conflicts:
#	Cargo.lock
#	client-test/Cargo.toml
#	rpc/src/rpc_subscriptions.rs

* Fix conflicts

Co-authored-by: segfaultdoctor <[email protected]>
Co-authored-by: Tyera Eulberg <[email protected]>
@segfaultdoc
Copy link
Contributor Author

Oh sorry, one last thing: in a separate PR, can you please add the new method to docs here? https://github.com/solana-labs/solana/blob/master/docs/src/developing/clients/jsonrpc-api.md (with voteSubscribe, for now) Thank you!

most certainly! thx for bearing with me :)

@segfaultdoc
Copy link
Contributor Author

Oh sorry, one last thing: in a separate PR, can you please add the new method to docs here? https://github.com/solana-labs/solana/blob/master/docs/src/developing/clients/jsonrpc-api.md (with voteSubscribe, for now) Thank you!

Here it is: #22002

@brooksprumo brooksprumo mentioned this pull request Jan 5, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
community Community contribution
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants