diff --git a/ipld-resolver/.github/workflows/ci.yaml b/ipld-resolver/.github/workflows/ci.yaml new file mode 100644 index 000000000..b15e12a82 --- /dev/null +++ b/ipld-resolver/.github/workflows/ci.yaml @@ -0,0 +1,90 @@ +name: CI + +on: + push: + branches: + - main + pull_request: + branches: + - '**' + +jobs: + # Check code formatting; anything that doesn't require compilation. + pre-compile-checks: + name: Pre-compile checks + runs-on: ubuntu-latest + steps: + - name: Check out the project + uses: actions/checkout@v3 + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: nightly + components: rustfmt + - name: Check code formatting + run: make check-fmt + - name: Check license headers + run: make license + # - name: Check diagrams + # run: make check-diagrams + + # Test matrix, running tasks from the Makefile. + tests: + needs: [pre-compile-checks] + name: ${{ matrix.make.name }} (${{ matrix.os }}, ${{ matrix.rust }}) + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + rust: [nightly] + make: + - name: Lint + task: lint + - name: Test + task: test + exclude: + - rust: stable + make: + name: Lint + + env: + RUST_BACKTRACE: full + RUSTFLAGS: -Dwarnings + CARGO_INCREMENTAL: '0' + SCCACHE_CACHE_SIZE: 10G + CC: "sccache clang" + CXX: "sccache clang++" + + steps: + - name: Check out the project + uses: actions/checkout@v3 + + - name: Install Rust + uses: dtolnay/rust-toolchain@master + with: + targets: wasm32-unknown-unknown + toolchain: ${{ matrix.rust }} + components: rustfmt,clippy + + # Protobuf compiler required by libp2p-core + - name: Install Protoc + uses: arduino/setup-protoc@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Setup sccache + uses: hanabi1224/sccache-action@v1.2.0 # https://github.com/hanabi1224/sccache-action used by Forest. + timeout-minutes: 5 + continue-on-error: true + with: + release-name: v0.3.1 + # Caching everything separately, in case they don't ask for the same things to be compiled. + cache-key: ${{ matrix.make.name }}-${{ matrix.os }}-${{matrix.rust}}-${{ hashFiles('Cargo.lock', 'rust-toolchain', 'rust-toolchain.toml') }} + # Not sure why we should ever update a cache that has the hash of the lock file in it. + # In Forest it only contains the rust-toolchain, so it makes sense to update because dependencies could have changed. + cache-update: false + + - name: ${{ matrix.make.name }} + run: make ${{ matrix.make.task }} diff --git a/ipld-resolver/.gitignore b/ipld-resolver/.gitignore new file mode 100644 index 000000000..a5c943bd7 --- /dev/null +++ b/ipld-resolver/.gitignore @@ -0,0 +1,5 @@ +.idea/ +*.iml +/target +docs/diagrams/plantuml.jar +Cargo.lock diff --git a/ipld-resolver/Cargo.toml b/ipld-resolver/Cargo.toml new file mode 100644 index 000000000..3263ec61f --- /dev/null +++ b/ipld-resolver/Cargo.toml @@ -0,0 +1,67 @@ +[package] +name = "ipc_ipld_resolver" +version = "0.1.0" +description = "P2P library to resolve IPLD content across IPC subnets." +authors = ["Protocol Labs"] +edition = "2021" +license-file = "LICENSE" + +[dependencies] +anyhow = "1.0" +base64 = "0.21.0" +blake2b_simd = "1.0" +bloom = "0.3" +gcra = "0.4" +lazy_static = "1.4" +libipld = { version = "0.14", default-features = false, features = ["dag-cbor"] } +libp2p = { version = "0.50", default-features = false, features = [ + "gossipsub", + "kad", + "identify", + "ping", + "noise", + "yamux", + "tcp", + "dns", + "mplex", + "request-response", + "metrics", + "tokio", + "macros", + "serde", + "secp256k1", + "plaintext", +] } +libp2p-bitswap = "0.25.1" +log = "0.4" +prometheus = "0.13" +quickcheck = { version = "1", optional = true } +rand = "0.8" +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0.91", features = ["raw_value"] } +thiserror = "1.0.38" +tokio = { version = "1.16", features = ["full"] } + +fvm_ipld_encoding = "0.3" +fvm_shared = { version = "~3.2", default-features = false, features = ["crypto"], optional = true } +fvm_ipld_blockstore = { version = "0.1", optional = true } + +# Using the IPC SDK without the `fil-actor` feature so as not to depend on the actor `Runtime`. +# Using the `main` branch instead of the highest available tag `v0.3.0` because the latter doesn't have a feature flag for the `Runtime`. +ipc-sdk = { git = "https://github.com/consensus-shipyard/ipc.git", default-features = false, branch = "dev" } + +[dev-dependencies] +quickcheck_macros = "1" +env_logger = "0.10" +fvm_ipld_hamt = "0.6" + +ipc_ipld_resolver = { path = ".", features = ["arb"] } + +[features] +default = ["arb", "missing_blocks"] +arb = ["quickcheck", "fvm_shared/arb"] +missing_blocks = ["fvm_ipld_blockstore"] + +[patch.crates-io] +# Use stable-only features. +gcra = { git = "https://github.com/consensus-shipyard/gcra-rs.git", branch = "main" } diff --git a/ipld-resolver/LICENSE b/ipld-resolver/LICENSE new file mode 100644 index 000000000..11df919e1 --- /dev/null +++ b/ipld-resolver/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 ConsensusLab + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/ipld-resolver/Makefile b/ipld-resolver/Makefile new file mode 100644 index 000000000..c5ef24e9c --- /dev/null +++ b/ipld-resolver/Makefile @@ -0,0 +1,35 @@ +.PHONY: all build test lint license check-fmt check-clippy diagrams + +all: test build + +build: + cargo build -Z unstable-options --release + +test: + cargo test --release --workspace + +clean: + cargo clean + +lint: \ + license \ + check-fmt \ + check-clippy + +license: + ./scripts/add_license.sh + +check-fmt: + cargo fmt --all --check + +check-clippy: + cargo clippy --all --tests -- -D clippy::all + +diagrams: + $(MAKE) -C docs/diagrams + +check-diagrams: diagrams + if git diff --name-only docs/diagrams | grep .png; then \ + echo "There are uncommitted changes to the diagrams"; \ + exit 1; \ + fi diff --git a/ipld-resolver/README.md b/ipld-resolver/README.md new file mode 100644 index 000000000..b9325eaaa --- /dev/null +++ b/ipld-resolver/README.md @@ -0,0 +1,60 @@ +# IPLD Resolver + +The IPLD Resolver is a Peer-to-Peer library which can be used to resolve arbitrary CIDs from subnets in InterPlanetary Consensus. + +See the [docs](./docs/) for a conceptual overview. + +## Usage + +Please have a look at the [smoke test](./tests/smoke.rs) for an example of using the library. + +The following snippet demonstrates how one would create a resolver instance and use it: + +```rust +async fn main() { + let config = Config { + connection: ConnectionConfig { + listen_addr: "/ip4/127.0.0.1/tcp/0".parse().unwrap(), + expected_peer_count: 1000, + max_incoming: 25, + max_peers_per_query: 10, + event_buffer_capacity: 100, + }, + network: NetworkConfig { + local_key: Keypair::generate_secp256k1(), + network_name: "example".to_owned(), + }, + discovery: DiscoveryConfig { + static_addresses: vec!["/ip4/95.217.194.97/tcp/8008/p2p/12D3KooWC1EaEEpghwnPdd89LaPTKEweD1PRLz4aRBkJEA9UiUuS".parse().unwrap()] + target_connections: 50, + enable_kademlia: true, + }, + membership: MembershipConfig { + static_subnets: vec![], + max_subnets: 10, + publish_interval: Duration::from_secs(300), + min_time_between_publish: Duration::from_secs(5), + max_provider_age: Duration::from_secs(60), + }, + }; + + let store = todo!("implement BitswapStore and a Blockstore"); + + let service = Service::new(config, store.clone()); + let client = service.client(); + + tokio::task::spawn(async move { service.run().await }); + + let cid: Cid = todo!("the CID we want to resolve"); + let subnet_id: SubnetID = todo!("the SubnetID from where the CID can be resolved"); + + match client.resolve(cid, subnet_id).await.unwrap() { + Ok(()) => { + let _content: MyContent = store.get(cid).unwrap(); + } + Err(e) => { + println!("{cid} could not be resolved from {subnet_id}: {e}") + } + } +} +``` diff --git a/ipld-resolver/docs/README.md b/ipld-resolver/docs/README.md new file mode 100644 index 000000000..ad8c4f7de --- /dev/null +++ b/ipld-resolver/docs/README.md @@ -0,0 +1,82 @@ +# IPLD Resolver + +The IPLD Resolver is a library that [IPC Agents](https://github.com/consensus-shipyard/ipc/) can use to exchange data between subnets in IPLD format. + +## Checkpointing + +The most typical use case would be the propagation of checkpoints from child subnets to the parent subnet. + +### Checkpoint Schema + +One possible conceptual model of checkpointing is depicted by the following Entity Relationship diagram: + +![Checkpoint Schema](diagrams/checkpoint_schema.png) + +It shows that the Subnet Actor in the parent subnet governs the power of validators in the child subnet by proposing _Configurations_, which the child subnet is free to adopt in its _Epochs_ when the time is right, communicating back the next adopted config via _Checkpoints_. + +At the end of an epoch, the validators in the child subnet produce a checkpoint over some contents, notably the cross-messages they want to propagate towards the parent subnet. Through the cross-messages, the checkpoint indirectly points to individual messages that users or actors wanted to send. + +Once enough signatures are collected to form a Quorum Certificate over the checkpoint (the specific rules are in the jurisdiction of the Subnet Actor), the checkpoint is submitted to the parent ledger. + +However, the submitted checkpoint does not contain the raw messages, only the meta-data. The content needs to be resolved using the IPC Resolver, as indicated by the dotted line. + +### Checkpoint Submission and Resolution + +The following sequence diagram shows one possible way how checkpoints can be submitted from the child to the parent subnet. + +It depicts two validators: one only participating on the parent subnet, and the other on the child subnet; the latter has to also run at least a full node on the parent subnet. Both validators run one IPC Agent each. + +The diagram shows that at the end of the epoch the child subnet validators produce a Quorum Certificate over the checkpoint, which some of their agents submit to the parent subnet. + +After that, the parent subnet nodes reach out to their associated IPC Agent to resolve the messages referenced by the checkpoint, which the Agent does by communicating with some of its child-subnet peers. + +![Checkpoint Submission](diagrams/checkpoint_submission.png) + +This is just a high level view of what happens during message resolution. In the next section we will delve deeper into the internals of the IPLD Resolver. + + +## IPLD Resolver Sub-components + +The IPLD Resolver uses libp2p to form a Peer-to-Peer network, using the following protocols: +* [Ping](https://github.com/libp2p/rust-libp2p/tree/v0.50.1/protocols/ping) +* [Identify](https://github.com/libp2p/rust-libp2p/tree/v0.50.1/protocols/ping) is used to learn the listening address of the remote peers +* [Kademlia](https://github.com/libp2p/rust-libp2p/tree/v0.50.1/protocols/kad) is used for peer discovery +* [Gossipsub](https://github.com/libp2p/rust-libp2p/tree/v0.50.1/protocols/gossipsub) is used to announce information about subnets the peers provide data for +* [Bitswap](https://github.com/ipfs-rust/libp2p-bitswap) is used to resolve CIDs to content + +See the libp2p [specs](https://github.com/libp2p/specs) and [docs](https://docs.libp2p.io/concepts/fundamentals/protocols/) for details on each protocol, and look [here](https://docs.ipfs.tech/concepts/bitswap/) for Bitswap. + +The Resolver is completely agnostic over what content it can resolve, as long as it's based on CIDs; it's not aware of the checkpointing use case above. + +The interface with the host system is through a host-provided implementation of the [BitswapStore](https://github.com/ipfs-rust/libp2p-bitswap/blob/7dd9cececda3e4a8f6e14c200a4b457159d8db33/src/behaviour.rs#L55) which the library uses to retrieve and store content. Implementors can make use of the [missing_blocks](../src/missing_blocks.rs) helper method which recursively collects all CIDs from an IPLD `Blockstore`, starting from the root CID we are looking for. + +Internally the protocols are wrapped into behaviours that interpret their events and manage their associated state: +* `Discovery` wraps `Kademlia` +* `Membership` wraps `Gossipsub` +* `Content` wraps `Bitswap` + +The following diagram shows a typical sequence of events within the IPLD Resolver. For brevity, only one peer is shown in detail; it's counterpart is represented as a single boundary. + +![IPLD Resolver](diagrams/ipld_resolver.png) + +# Diagram Automation + +The diagrams in this directory can be rendered with `make diagrams`. + +Adding the following script to `.git/hooks/pre-commit` automatically renders and checks in the images when we commit changes to the them. CI should also check that there are no uncommitted changes. + +```bash +#!/usr/bin/env bash + +# If any command fails, exit immediately with that command's exit status +set -eo pipefail + +# Redirect output to stderr. +exec 1>&2 + +if git diff --cached --name-only --diff-filter=d | grep .puml +then + make diagrams + git add docs/diagrams/*.png +fi +``` diff --git a/ipld-resolver/docs/diagrams/Makefile b/ipld-resolver/docs/diagrams/Makefile new file mode 100644 index 000000000..5d83b2f6a --- /dev/null +++ b/ipld-resolver/docs/diagrams/Makefile @@ -0,0 +1,16 @@ +PUMLS = $(shell find . -type f -name "*.puml") +PNGS = $(PUMLS:.puml=.png) +PUML_VER=1.2023.2 + +.PHONY: all +all: diagrams + +.PHONY: diagrams +diagrams: $(PNGS) + +plantuml.jar: + wget -O $@ https://github.com/plantuml/plantuml/releases/download/v$(PUML_VER)/plantuml-$(PUML_VER).jar --no-check-certificate --quiet + +%.png: plantuml.jar %.puml + @# Using pipelining to preserve file names. + cat $*.puml | java -jar plantuml.jar -pipe > $*.png diff --git a/ipld-resolver/docs/diagrams/checkpoint_schema.png b/ipld-resolver/docs/diagrams/checkpoint_schema.png new file mode 100644 index 000000000..f7f10b177 Binary files /dev/null and b/ipld-resolver/docs/diagrams/checkpoint_schema.png differ diff --git a/ipld-resolver/docs/diagrams/checkpoint_schema.puml b/ipld-resolver/docs/diagrams/checkpoint_schema.puml new file mode 100644 index 000000000..1232d428e --- /dev/null +++ b/ipld-resolver/docs/diagrams/checkpoint_schema.puml @@ -0,0 +1,112 @@ + +@startuml Checkpointing Schema + +package "Subnet Actor in parent subnet" #79ADDC { + entity "Validator" as validator { + * public_key <> + -- + * power: delegated stake + } + + entity "Configuration" as config { + * config_number <> + -- + } + + entity "Validator Snapshot" as validator_snapshot { + * config_number <> + * public_key <> + -- + * power: delegated stake in the config + } + + entity "Submitted Checkpoint" as submit_ckpt { + * checkpoint <> + } + note bottom of submit_ckpt + Such that the signatures + form a Quorum in the config. + end note +} + +package "checkpointing" #FFEE93 { + entity "Checkpoint" as ckpt { + * epoch_number <> + -- + * next_config_number <> + * state_hash: CID + ... + } + note bottom of ckpt + Next config indicates who will + sign the next checkpoint. + end note + + entity "Signature" as sig { + * public_key <>: validator public key + * checkpoint <> + -- + * signature + } +} + + +package "child subnet" #FFC09F { + entity "Epoch" as epoch { + * epoch_number <> + -- + * config_number <> + * start_block_height <> + * epoch_length + } + + entity "Cross Messages" as cross_msgs { + * epoch_number <> + -- + * messages <>: CID + } + note bottom of cross_msgs + An AMT containing CIDs + end note + + entity "Messsage" as msg { + * id: CID <> + -- + * from: address <> + * to: address <> + * nonce + * payload + ... + } + + entity "Block" as block { + * hash <>: CID + -- + * height + * messages <>: CID + } +} + +block |o--o{ msg + +validator_snapshot }|--|| config +validator_snapshot }o--|| validator + +epoch }o--|| config +epoch |o--|| block +epoch ||--|| cross_msgs +epoch ||--o| ckpt + +sig |o--|| ckpt +sig }o--|| validator + +ckpt }o--|| config +ckpt ||--o| submit_ckpt + + +cross_msgs |o--o{ msg + +submit_ckpt .. cross_msgs : can be resolved with the IPLD Resolver + + +@enduml diff --git a/ipld-resolver/docs/diagrams/checkpoint_submission.png b/ipld-resolver/docs/diagrams/checkpoint_submission.png new file mode 100644 index 000000000..299c17356 Binary files /dev/null and b/ipld-resolver/docs/diagrams/checkpoint_submission.png differ diff --git a/ipld-resolver/docs/diagrams/checkpoint_submission.puml b/ipld-resolver/docs/diagrams/checkpoint_submission.puml new file mode 100644 index 000000000..a888da884 --- /dev/null +++ b/ipld-resolver/docs/diagrams/checkpoint_submission.puml @@ -0,0 +1,87 @@ +@startuml Submit Checkpoint +box "Parent Subnet Validator" #EEFBF5 +participant "Parent Lotus (Validator/Miner)" as parent_lotus +control "IPC Agent" as parent_agent +end box + +box "Child Subnet Validator" #FEEFEC +control "IPC Agent" as child_agent +participant "Parent Lotus (Full)" as full_lotus +participant "Child Lotus (Validator)" as child_lotus +actor "Validator" as validator +end box + +== Initialize == + +parent_agent --> parent_lotus: subscribe + +validator --> child_lotus: start +validator --> full_lotus: start +validator --> child_agent: start + +child_agent --> full_lotus: subscribe +child_agent --> child_lotus: subscribe + +validator -> child_agent ++: join subnet +child_agent -> parent_lotus --: join subnet +parent_lotus -> parent_lotus: create block and\nexecute transaction + +== During Epoch == + +loop + parent_lotus --> full_lotus: broadcast block + alt if contains top-down messages + full_lotus --> child_agent: observe finalized top-down message + child_agent -> child_lotus: submit finalized top-down message + child_lotus -> child_agent: is finalized on parent? + note right + Check messages proposed by others. + end note + end + + alt if has power to create block + child_lotus -> child_lotus: create block + end + + child_lotus -> child_lotus: receive block +end + +== End of Epoch == + +child_lotus -> child_lotus: next block producer\ncreates checkpoint +note left + Ledger rules dictate + checkpoint contents. +end note +child_lotus --> child_agent: observe checkpoint + +alt if validator in epoch + child_agent -> child_lotus: submit signature over checkpoint +end + +loop + child_lotus -> child_lotus: create block + note left + Accumulate signatures + in the ledger. + end note + ... wait for quorum of signatures ... +end + +child_lotus --> child_agent: observe quorum +child_agent -> parent_lotus: submit checkpoint with quorum certificate + +parent_lotus -> parent_agent ++: resolve checkpoint CID +parent_agent -> child_agent ++: resolve checkpoint CID +note right +This is where the IPLD Resolver +comes into play. +end note +child_agent -> child_lotus: fetch checkpoint contents +return checkpoint contents +return checkpoint contents + +parent_lotus -> parent_lotus: create block and\nexecute checkpoint +parent_lotus --> full_lotus: broadcast block + +@enduml diff --git a/ipld-resolver/docs/diagrams/ipld_resolver.png b/ipld-resolver/docs/diagrams/ipld_resolver.png new file mode 100644 index 000000000..cec82894a Binary files /dev/null and b/ipld-resolver/docs/diagrams/ipld_resolver.png differ diff --git a/ipld-resolver/docs/diagrams/ipld_resolver.puml b/ipld-resolver/docs/diagrams/ipld_resolver.puml new file mode 100644 index 000000000..48903712e --- /dev/null +++ b/ipld-resolver/docs/diagrams/ipld_resolver.puml @@ -0,0 +1,100 @@ +@startuml IPLD Resolver +actor Host +boundary Client +control Service +database SubnetProvidersCache +database BitswapStore +participant "Content (Bitswap)" as Content +participant "Membership (Gossipsub)" as Membership +participant "Discovery (Kademlia)" as Discovery +participant Identify +boundary Remote + + +Host -> Service ++: new +Service -> Identify: create +Service -> Discovery: create +Service -> Membership: create +Service -> Content: create +return (Service, Client) + +Host -> Service ++: run + +== Bootstrapping == + +Membership -> Remote: subscribe to membership topic +Service -> Discovery ++: bootstrap from seed peers +Discovery -> Remote: find neighbours +Remote -> Discovery: peer addresses + +Discovery -> Discovery: add address +Discovery -> Service ++: peer routable +Service -> Membership --: peer routable + +Host -> Client ++: set provided subnets +Client -> Service --++: set provided subnets +Service -> Membership --++: set provided subnets +Membership -> Remote --: publish subnets to membership topic + +Discovery -> Service --: bootstrap finished + +Remote -> Identify: listening address +Identify -> Service ++: listening address +Service -> Discovery ++: add address +Discovery -> Service --: peer routable +Service -> Membership --: peer routable + +== Gossiping == + +loop + alt publish interval tick + Membership -> Remote: publish SignedProviderRecord + Membership -> SubnetProvidersCache: prune expired records + else + Remote -> Membership ++: SignedProviderRecord + alt if peer routable + Membership -> SubnetProvidersCache --: add provider + end + end +end + +== Resolution == + +Host -> Client ++: resolve CID from subnet +Client -> Service ++: resolve CID from subnet +Service -> Membership: get providers of subnet +Service -> Service: prioritize peers, connected first +loop + Service -> Content ++: resolve CID from first N peers + Content -> BitswapStore: get missing blocks of root CID + loop while has missing CID + loop for each peer + Content -> Remote: want-have CID + Remote -> Content: have-block true/false + note left + Gather peers who can be asked. + end note + end + loop until have block or no more peers to try + Content -> Remote: want-block CID + alt block is received + Remote -> Content: block + Content -> BitswapStore: insert block + Content -> BitswapStore: get missing blocks of retrieved CID + end + end + end + Content -> Service --: resolution result + alt if failed to resolve but has fallback peers + Service -> Service: pick next N peers + else + Service -> Client --: resolution result + end +end +Client -> Host --: resolution result + +alt if succeeded + Host -> BitswapStore: retrieve content by CID +end + +@enduml diff --git a/ipld-resolver/rust-toolchain.toml b/ipld-resolver/rust-toolchain.toml new file mode 100644 index 000000000..a59cf37c5 --- /dev/null +++ b/ipld-resolver/rust-toolchain.toml @@ -0,0 +1,4 @@ +[toolchain] +channel = "stable" +components = ["clippy", "llvm-tools", "rustfmt"] +targets = ["wasm32-unknown-unknown"] diff --git a/ipld-resolver/scripts/add_license.sh b/ipld-resolver/scripts/add_license.sh new file mode 100755 index 000000000..d011bd418 --- /dev/null +++ b/ipld-resolver/scripts/add_license.sh @@ -0,0 +1,42 @@ +#!/bin/bash +# +# Checks if the source code contains required license and adds it if necessary. +# Returns 1 if there was a missing license, 0 otherwise. +COPYRIGHT_TXT=$(dirname $0)/copyright.txt + +# Any year is fine. We can update the year as a single PR in all files that have it up to last year. +PAT_PL=".*// Copyright 2022-202\d Protocol Labs.*" +PAT_SPDX="/*// SPDX-License-Identifier: MIT.*" + +# Look at enough lines so that we can include multiple copyright holders. +LINES=4 + +ret=0 + +# NOTE: When files are moved/split/deleted, the following queries would find and recreate them in the original place. +# To avoid that, first commit the changes, then run the linter; that way only the new places are affected. + +# Look for files without headers. +for file in $(git grep --cached -Il '' -- '*.rs'); do + header=$(head -$LINES "$file") + if ! echo "$header" | grep -q -P "$PAT_SPDX"; then + echo "$file was missing header" + cat $COPYRIGHT_TXT "$file" > temp + mv temp "$file" + ret=1 + fi +done + +# Look for changes that don't have the new copyright holder. +for file in $(git diff --diff-filter=d --name-only main -- '*.rs'); do + header=$(head -$LINES "$file") + if ! echo "$header" | grep -q -P "$PAT_PL"; then + echo "$file was missing Protocol Labs" + head -1 $COPYRIGHT_TXT > temp + cat "$file" >> temp + mv temp "$file" + ret=1 + fi +done + +exit $ret diff --git a/ipld-resolver/scripts/copyright.txt b/ipld-resolver/scripts/copyright.txt new file mode 100644 index 000000000..754c135d2 --- /dev/null +++ b/ipld-resolver/scripts/copyright.txt @@ -0,0 +1,2 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT diff --git a/ipld-resolver/src/arb.rs b/ipld-resolver/src/arb.rs new file mode 100644 index 000000000..ae1a1a7b8 --- /dev/null +++ b/ipld-resolver/src/arb.rs @@ -0,0 +1,66 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +use fvm_shared::address::Address; +use ipc_sdk::subnet_id::SubnetID; +use libipld::{Cid, Multihash}; +use quickcheck::Arbitrary; + +/// Unfortunately an arbitrary `DelegatedAddress` can be inconsistent +/// with bytes that do not correspond to its length. This struct fixes +/// that so we can generate arbitrary addresses that don't fail equality +/// after a roundtrip. +#[derive(Clone, Debug)] +pub struct ArbAddress(pub Address); + +impl Arbitrary for ArbAddress { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + let addr = Address::arbitrary(g); + let bz = addr.to_bytes(); + let addr = Address::from_bytes(&bz).expect("address roundtrip works"); + Self(addr) + } +} + +#[derive(Clone, Debug)] +pub struct ArbSubnetID(pub SubnetID); + +impl Arbitrary for ArbSubnetID { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + let child_count = usize::arbitrary(g) % 4; + + let children = (0..child_count) + .map(|_| { + if bool::arbitrary(g) { + Address::new_id(u64::arbitrary(g)) + } else { + // Only expectign EAM managed delegated addresses. + let subaddr: [u8; 20] = std::array::from_fn(|_| Arbitrary::arbitrary(g)); + Address::new_delegated(10, &subaddr).unwrap() + } + }) + .collect::>(); + + Self(SubnetID::new(u64::arbitrary(g), children)) + } +} + +/// Unfortunately ref-fvm depends on cid:0.8.6, which depends on quickcheck:0.9 +/// whereas here we use quickcheck:1.0. This causes conflicts and the `Arbitrary` +/// implementations for `Cid` are not usable to us, nor can we patch all `cid` +/// dependencies to use 0.9 because then the IPLD and other FVM traits don't work. +/// +/// TODO: Remove this module when the `cid` dependency is updated. +/// +/// NOTE: This is based on the [simpler version](https://github.com/ChainSafe/forest/blob/v0.6.0/blockchain/blocks/src/lib.rs) in Forest. +/// The original uses weighted distributions to generate more plausible CIDs. +#[derive(Clone)] +pub struct ArbCid(pub Cid); + +impl Arbitrary for ArbCid { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + Self(Cid::new_v1( + u64::arbitrary(g), + Multihash::wrap(u64::arbitrary(g), &[u8::arbitrary(g)]).unwrap(), + )) + } +} diff --git a/ipld-resolver/src/behaviour/content.rs b/ipld-resolver/src/behaviour/content.rs new file mode 100644 index 000000000..3a2e9caa1 --- /dev/null +++ b/ipld-resolver/src/behaviour/content.rs @@ -0,0 +1,343 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT + +use std::{ + collections::{HashMap, VecDeque}, + task::{Context, Poll}, + time::Duration, +}; + +use libipld::{store::StoreParams, Cid}; +use libp2p::{ + core::ConnectedPoint, + futures::channel::oneshot, + multiaddr::Protocol, + request_response::handler::RequestResponseHandlerEvent, + swarm::{ + derive_prelude::{ConnectionId, FromSwarm}, + ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, + }, + Multiaddr, PeerId, +}; +use libp2p_bitswap::{Bitswap, BitswapConfig, BitswapEvent, BitswapResponse, BitswapStore}; +use log::warn; +use prometheus::Registry; + +use crate::{ + limiter::{RateLimit, RateLimiter}, + stats, +}; + +pub type QueryId = libp2p_bitswap::QueryId; + +// Not much to do here, just hiding the `Progress` event as I don't think we'll need it. +// We can't really turn it into anything more meaningful; the outer Service, which drives +// the Swarm events, will have to store the `QueryId` and figure out which CID it was about +// (there could be multiple queries running over the same CID) and how to respond to the +// original requestor (e.g. by completing a channel). +#[derive(Debug)] +pub enum Event { + /// Event raised when a resolution request is finished. + /// + /// The result will indicate either success, or arbitrary failure. + /// If it is a success, the CID can be found in the [`BitswapStore`] + /// instance the behaviour was created with. + /// + /// Note that it is possible that the synchronization completed + /// partially, but some recursive constituent is missing. The + /// caller can use the [`missing_blocks`] function to check + /// whether a retry is necessary. + Complete(QueryId, anyhow::Result<()>), + + /// Event raised when we want to execute some logic with the `BitswapResponse`. + /// This is only raised if we are tracking rate limits. The service has to + /// do the forwarding between the two oneshot channels, and call this module + /// back between doing so. + BitswapForward { + peer_id: PeerId, + /// Receive response from the [`Bitswap`] behaviour. + /// Normally this goes straight to the handler. + response_rx: oneshot::Receiver, + /// Forward the response to the handler. + response_tx: oneshot::Sender, + }, +} + +/// Configuration for [`content::Behaviour`]. +#[derive(Debug, Clone)] +pub struct Config { + /// Number of bytes that can be consumed remote peers in a time period. + /// + /// 0 means no limit. + pub rate_limit_bytes: u32, + /// Length of the time period at which the consumption limit fills. + /// + /// 0 means no limit. + pub rate_limit_period: Duration, +} + +/// Behaviour built on [`Bitswap`] to resolve IPLD content from [`Cid`] to raw bytes. +pub struct Behaviour { + inner: Bitswap

, + /// Remember which address peers connected from, so we can apply the rate limit + /// on the address, and not on the peer ID which they can change easily. + peer_addresses: HashMap, + /// Limit the amount of data served by remote address. + rate_limiter: RateLimiter, + rate_limit_period: Duration, + rate_limit: Option, + outbox: VecDeque, +} + +impl Behaviour

{ + pub fn new(config: Config, store: S) -> Self + where + S: BitswapStore, + { + let bitswap = Bitswap::new(BitswapConfig::default(), store); + let rate_limit = if config.rate_limit_bytes == 0 || config.rate_limit_period.is_zero() { + None + } else { + Some(RateLimit::new( + config.rate_limit_bytes, + config.rate_limit_period, + )) + }; + Self { + inner: bitswap, + peer_addresses: Default::default(), + rate_limiter: RateLimiter::new(config.rate_limit_period), + rate_limit_period: config.rate_limit_period, + rate_limit, + outbox: Default::default(), + } + } + + /// Register Prometheus metrics. + pub fn register_metrics(&self, registry: &Registry) -> anyhow::Result<()> { + self.inner.register_metrics(registry) + } + + /// Recursively resolve a [`Cid`] and all underlying CIDs into blocks. + /// + /// The [`Bitswap`] behaviour will call the [`BitswapStore`] to ask for + /// blocks which are missing, ie. find CIDs which aren't available locally. + /// It is up to the store implementation to decide which links need to be + /// followed. + /// + /// It is also up to the store implementation to decide which CIDs requests + /// to responds to, e.g. if we only want to resolve certain type of content, + /// then the store can look up in a restricted collection, rather than the + /// full IPLD store. + /// + /// Resolution will be attempted from the peers passed to the method, + /// starting with the first one with `WANT-BLOCK`, then whoever responds + /// positively to `WANT-HAVE` requests. The caller should talk to the + /// `membership::Behaviour` first to find suitable peers, and then + /// prioritise peers which are connected. + /// + /// The underlying [`libp2p_request_response::RequestResponse`] behaviour + /// will initiate connections to the peers which aren't connected at the moment. + pub fn resolve(&mut self, cid: Cid, peers: Vec) -> QueryId { + stats::CONTENT_RESOLVE_RUNNING.inc(); + // Not passing any missing items, which will result in a call to `BitswapStore::missing_blocks`. + self.inner.sync(cid, peers, [].into_iter()) + } + + /// Check whether the peer has already exhaused their rate limit. + fn check_rate_limit(&mut self, peer_id: &PeerId, cid: &Cid) -> bool { + if let Some(ref rate_limit) = self.rate_limit { + if let Some(addr) = self.peer_addresses.get(peer_id).cloned() { + let bytes = cid.to_bytes().len().try_into().unwrap_or(u32::MAX); + + if !self.rate_limiter.add(rate_limit, addr, bytes) { + return false; + } + } + } + true + } + + /// Callback by the service after [`Event::BitswapForward`]. + pub fn rate_limit_used(&mut self, peer_id: PeerId, bytes: usize) { + if let Some(ref rate_limit) = self.rate_limit { + if let Some(addr) = self.peer_addresses.get(&peer_id).cloned() { + let bytes = bytes.try_into().unwrap_or(u32::MAX); + let _ = self.rate_limiter.add(rate_limit, addr, bytes); + } + } + } + + /// Update the rate limit to a new value, keeping the period as-is. + pub fn update_rate_limit(&mut self, bytes: u32) { + if bytes == 0 || self.rate_limit_period.is_zero() { + self.rate_limit = None; + } else { + self.rate_limit = Some(RateLimit::new(bytes, self.rate_limit_period)) + } + } +} + +impl NetworkBehaviour for Behaviour

{ + type ConnectionHandler = as NetworkBehaviour>::ConnectionHandler; + type OutEvent = Event; + + fn new_handler(&mut self) -> Self::ConnectionHandler { + self.inner.new_handler() + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + self.inner.addresses_of_peer(peer_id) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + // Store the remote address. + match &event { + FromSwarm::ConnectionEstablished(c) => { + if c.other_established == 0 { + let peer_addr = match c.endpoint { + ConnectedPoint::Dialer { + address: listen_addr, + .. + } => listen_addr.clone(), + ConnectedPoint::Listener { + send_back_addr: ephemeral_addr, + .. + } => select_non_ephemeral(ephemeral_addr.clone()), + }; + self.peer_addresses.insert(c.peer_id, peer_addr); + } + } + FromSwarm::ConnectionClosed(c) => { + if c.remaining_established == 0 { + self.peer_addresses.remove(&c.peer_id); + } + } + // Note: Ignoring FromSwarm::AddressChange - as long as the same peer connects, + // not updating the address provides continuity of resource consumption. + _ => {} + } + + self.inner.on_swarm_event(event) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: <::Handler as ConnectionHandler>::OutEvent, + ) { + match event { + RequestResponseHandlerEvent::Request { + request_id, + request, + sender, + } if self.rate_limit.is_some() => { + if !self.check_rate_limit(&peer_id, &request.cid) { + warn!("rate limiting {peer_id}"); + stats::CONTENT_RATE_LIMITED.inc(); + return; + } + // We need to hijack the response channel to record the size, otherwise it goes straight to the handler. + let (tx, rx) = libp2p::futures::channel::oneshot::channel(); + let event = RequestResponseHandlerEvent::Request { + request_id, + request, + sender: tx, + }; + + self.inner + .on_connection_handler_event(peer_id, connection_id, event); + + let forward = Event::BitswapForward { + peer_id, + response_rx: rx, + response_tx: sender, + }; + self.outbox.push_back(forward); + } + _ => self + .inner + .on_connection_handler_event(peer_id, connection_id, event), + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> Poll> { + // Emit own events first. + if let Some(ev) = self.outbox.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); + } + // Poll Bitswap. + while let Poll::Ready(ev) = self.inner.poll(cx, params) { + match ev { + NetworkBehaviourAction::GenerateEvent(ev) => match ev { + BitswapEvent::Progress(_, _) => {} + BitswapEvent::Complete(id, result) => { + stats::CONTENT_RESOLVE_RUNNING.dec(); + let out = Event::Complete(id, result); + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + } + }, + other => { + return Poll::Ready(other.map_out(|_| unreachable!("already handled"))); + } + } + } + + Poll::Pending + } +} + +/// Get rid of parts of an address which are considered ephemeral, +/// keeping just the parts which would stay the same if for example +/// the same peer opened another connection from a different random port. +fn select_non_ephemeral(mut addr: Multiaddr) -> Multiaddr { + let mut keep = Vec::new(); + while let Some(proto) = addr.pop() { + match proto { + // Some are valid on their own right. + Protocol::Ip4(_) | Protocol::Ip6(_) => { + keep.clear(); + keep.push(proto); + break; + } + // Skip P2P peer ID, they might use a different identity. + Protocol::P2p(_) => {} + // Skip ephemeral parts. + Protocol::Tcp(_) | Protocol::Udp(_) => {} + // Everything else we keep until we see better options. + _ => { + keep.push(proto); + } + } + } + keep.reverse(); + Multiaddr::from_iter(keep) +} + +#[cfg(test)] +mod tests { + use libp2p::Multiaddr; + + use super::select_non_ephemeral; + + #[test] + fn non_ephemeral_addr() { + let examples = [ + ("/ip4/127.0.0.1/udt/sctp/5678", "/ip4/127.0.0.1"), + ("/ip4/95.217.194.97/tcp/8008/p2p/12D3KooWC1EaEEpghwnPdd89LaPTKEweD1PRLz4aRBkJEA9UiUuS", "/ip4/95.217.194.97"), + ("/udt/memory/10/p2p/12D3KooWC1EaEEpghwnPdd89LaPTKEweD1PRLz4aRBkJEA9UiUuS", "/udt/memory/10") + ]; + + for (addr, exp) in examples { + let addr: Multiaddr = addr.parse().unwrap(); + let exp: Multiaddr = exp.parse().unwrap(); + assert_eq!(select_non_ephemeral(addr), exp); + } + } +} diff --git a/ipld-resolver/src/behaviour/discovery.rs b/ipld-resolver/src/behaviour/discovery.rs new file mode 100644 index 000000000..875154707 --- /dev/null +++ b/ipld-resolver/src/behaviour/discovery.rs @@ -0,0 +1,364 @@ +// Copyright 2022-2023 Protocol Labs +// Copyright 2019-2022 ChainSafe Systems +// SPDX-License-Identifier: MIT +use std::{ + borrow::Cow, + cmp, + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; + +use libp2p::{ + core::connection::ConnectionId, + identify::Info, + kad::{ + handler::KademliaHandlerProto, store::MemoryStore, InboundRequest, Kademlia, + KademliaConfig, KademliaEvent, KademliaStoreInserts, QueryId, QueryResult, + }, + multiaddr::Protocol, + swarm::{ + behaviour::toggle::{Toggle, ToggleIntoConnectionHandler}, + derive_prelude::FromSwarm, + ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, + }, + Multiaddr, PeerId, +}; +use log::{debug, warn}; +use tokio::time::Interval; + +use crate::stats; + +use super::NetworkConfig; + +// NOTE: The Discovery behaviour is largely based on what exists in Forest. If it ain't broken... +// NOTE: Not sure if emitting events is going to be useful yet, but for now it's an example of having one. + +/// Event generated by the `Discovery` behaviour. +#[derive(Debug)] +pub enum Event { + /// Event emitted when a peer is added or updated in the routing table, + /// which means if we later ask for its addresses, they should be known. + Added(PeerId, Vec), + + /// Event emitted when a peer is removed from the routing table. + Removed(PeerId), +} + +/// Configuration for [`discovery::Behaviour`]. +#[derive(Clone, Debug)] +pub struct Config { + /// Custom nodes which never expire, e.g. bootstrap or reserved nodes. + /// + /// The addresses must end with a `/p2p/` part. + pub static_addresses: Vec, + /// Number of connections at which point we pause further discovery lookups. + pub target_connections: usize, + /// Option to disable Kademlia, for example in a fixed static network. + pub enable_kademlia: bool, +} + +#[derive(thiserror::Error, Debug)] +pub enum ConfigError { + #[error("invalid network: {0}")] + InvalidNetwork(String), + #[error("invalid bootstrap address: {0}")] + InvalidBootstrapAddress(Multiaddr), + #[error("no bootstrap address")] + NoBootstrapAddress, +} + +/// Discovery behaviour, periodically running a random lookup with Kademlia to find new peers. +/// +/// Our other option for peer discovery would be to rely on the Peer Exchange of Gossipsub. +/// However, the required Signed Records feature is not available in the Rust version of the library, as of v0.50. +pub struct Behaviour { + /// Local peer ID. + peer_id: PeerId, + /// User-defined list of nodes and their addresses. + /// Typically includes bootstrap nodes, or it can be used for a static network. + static_addresses: Vec<(PeerId, Multiaddr)>, + /// Name of the peer discovery protocol. + protocol_name: String, + /// Kademlia behaviour, if enabled. + inner: Toggle>, + /// Number of current connections. + num_connections: usize, + /// Number of connections where further lookups are paused. + target_connections: usize, + /// Interval between random lookups. + lookup_interval: Interval, + /// Buffer incoming identify requests until we have finished the bootstrap. + bootstrap_buffer: Option>, + /// Events to return when polled. + outbox: VecDeque, +} + +impl Behaviour { + /// Create a [`discovery::Behaviour`] from the configuration. + pub fn new(nc: NetworkConfig, dc: Config) -> Result { + if nc.network_name.is_empty() { + return Err(ConfigError::InvalidNetwork(nc.network_name)); + } + + // Parse static addresses. + let mut static_addresses = Vec::new(); + for multiaddr in dc.static_addresses { + let mut addr = multiaddr.clone(); + if let Some(Protocol::P2p(mh)) = addr.pop() { + let peer_id = PeerId::from_multihash(mh).unwrap(); + static_addresses.push((peer_id, addr)) + } else { + return Err(ConfigError::InvalidBootstrapAddress(multiaddr)); + } + } + + let mut outbox = VecDeque::new(); + let protocol_name = format!("/ipc/{}/kad/1.0.0", nc.network_name); + + let mut bootstrap_buffer = None; + + let kademlia_opt = if dc.enable_kademlia { + let mut kad_config = KademliaConfig::default(); + kad_config.set_protocol_names(vec![Cow::Owned(protocol_name.as_bytes().to_vec())]); + + // Disable inserting records into the memory store, so peers cannot send `PutRecord` + // messages to store content in the memory of our node. + kad_config.set_record_filtering(KademliaStoreInserts::FilterBoth); + + let store = MemoryStore::new(nc.local_peer_id()); + + let mut kademlia = Kademlia::with_config(nc.local_peer_id(), store, kad_config); + + // Bootstrap from the seeds. The first seed to stand up might have nobody to bootstrap from, + // although ideally there would be at least another peer, so we can easily restart it and come back. + if !static_addresses.is_empty() { + for (peer_id, addr) in static_addresses.iter() { + kademlia.add_address(peer_id, addr.clone()); + } + kademlia + .bootstrap() + .map_err(|_| ConfigError::NoBootstrapAddress)?; + + bootstrap_buffer = Some(Vec::new()); + } + + Some(kademlia) + } else { + // It would be nice to use `.group_by` here but it's unstable. + // Make sure static peers are reported as routable. + for (peer_id, addr) in static_addresses.iter() { + outbox.push_back(Event::Added(*peer_id, vec![addr.clone()])) + } + None + }; + + Ok(Self { + peer_id: nc.local_peer_id(), + static_addresses, + protocol_name, + inner: kademlia_opt.into(), + lookup_interval: tokio::time::interval(Duration::from_secs(1)), + outbox, + num_connections: 0, + bootstrap_buffer, + target_connections: dc.target_connections, + }) + } + + /// Lookup a peer, unless we already know their address, so that we have a chance to connect to them later. + pub fn background_lookup(&mut self, peer_id: PeerId) { + if self.addresses_of_peer(&peer_id).is_empty() { + if let Some(kademlia) = self.inner.as_mut() { + stats::DISCOVERY_BACKGROUND_LOOKUP.inc(); + kademlia.get_closest_peers(peer_id); + } + } + } + + /// Check if a peer has a user defined addresses. + fn is_static(&self, peer_id: PeerId) -> bool { + self.static_addresses.iter().any(|(id, _)| *id == peer_id) + } + + /// Add addresses we learned from the `Identify` protocol to Kademlia. + /// + /// This seems to be the only way, because Kademlia rightfully treats + /// incoming connections as ephemeral addresses, but doesn't have an + /// alternative exchange mechanism. + pub fn add_identified(&mut self, peer_id: &PeerId, info: Info) { + if info.protocols.contains(&self.protocol_name) { + // If we are still in the process of bootstrapping peers, buffer the incoming self-identify records, + // to protect against eclipse attacks that could fill the k-table with entries to crowd out honest peers. + if let Some(buffer) = self.bootstrap_buffer.as_mut() { + if buffer.len() < self.target_connections + && !buffer.iter().any(|(id, _)| id == peer_id) + { + buffer.push((*peer_id, info)) + } + } else { + for addr in info.listen_addrs.iter().cloned() { + self.add_address(peer_id, addr); + } + } + } + } + + /// Add a known address to Kademlia. + pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) { + if let Some(kademlia) = self.inner.as_mut() { + kademlia.add_address(peer_id, address); + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = ToggleIntoConnectionHandler>; + + type OutEvent = Event; + + fn new_handler(&mut self) -> Self::ConnectionHandler { + self.inner.new_handler() + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + let mut addrs = self + .static_addresses + .iter() + .filter(|(p, _)| p == peer_id) + .map(|(_, a)| a.clone()) + .collect::>(); + + addrs.extend(self.inner.addresses_of_peer(peer_id)); + addrs + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match &event { + FromSwarm::ConnectionEstablished(e) => { + if e.other_established == 0 { + stats::DISCOVERY_CONNECTED_PEERS.inc(); + self.num_connections += 1; + } + } + FromSwarm::ConnectionClosed(e) => { + if e.remaining_established == 0 { + stats::DISCOVERY_CONNECTED_PEERS.dec(); + self.num_connections -= 1; + } + } + _ => {} + }; + self.inner.on_swarm_event(event) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: <::Handler as ConnectionHandler>::OutEvent, + ) { + self.inner + .on_connection_handler_event(peer_id, connection_id, event) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> std::task::Poll> { + // Emit own events first. + if let Some(ev) = self.outbox.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); + } + + // Trigger periodic queries. + if self.lookup_interval.poll_tick(cx).is_ready() { + if self.num_connections < self.target_connections { + if let Some(k) = self.inner.as_mut() { + debug!("looking up a random peer"); + let random_peer_id = PeerId::random(); + k.get_closest_peers(random_peer_id); + } + } + + // Schedule the next random query with exponentially increasing delay, capped at 60 seconds. + self.lookup_interval = tokio::time::interval(cmp::min( + self.lookup_interval.period() * 2, + Duration::from_secs(60), + )); + // we need to reset the interval, otherwise the next tick completes immediately. + self.lookup_interval.reset(); + } + + // Poll Kademlia. + while let Poll::Ready(ev) = self.inner.poll(cx, params) { + match ev { + NetworkBehaviourAction::GenerateEvent(ev) => { + match ev { + // We get this event for inbound connections, where the remote address may be ephemeral. + KademliaEvent::UnroutablePeer { peer } => { + debug!("{peer} unroutable from {}", self.peer_id); + } + KademliaEvent::InboundRequest { + request: InboundRequest::PutRecord { source, .. }, + } => { + warn!("disallowed Kademlia requests from {source}",) + } + // Information only. + KademliaEvent::InboundRequest { .. } => {} + // Finish bootstrapping. + KademliaEvent::OutboundQueryProgressed { result, step, .. } => match result + { + QueryResult::Bootstrap(result) if step.last => { + debug!("Bootstrapping finished with {result:?}"); + if let Some(buffer) = self.bootstrap_buffer.take() { + debug!("Adding {} self-identified peers.", buffer.len()); + for (peer_id, info) in buffer { + self.add_identified(&peer_id, info) + } + } + } + _ => {} + }, + // The config ensures peers are added to the table if there's room. + // We're not emitting these as known peers because the address will probably not be returned by `addresses_of_peer`, + // so the outside service would have to keep track, which is not what we want. + KademliaEvent::RoutablePeer { peer, .. } => { + debug!("Kademlia in manual mode or bucket full, cannot add {peer}"); + } + // Unfortunately, looking at the Kademlia behaviour, it looks like when it goes from pending to active, + // it won't emit another event, so we might as well tentatively emit an event here. + KademliaEvent::PendingRoutablePeer { peer, address } => { + debug!("{peer} pending to the routing table of {}", self.peer_id); + self.outbox.push_back(Event::Added(peer, vec![address])) + } + // This event should ensure that we will be able to answer address lookups later. + KademliaEvent::RoutingUpdated { + peer, + addresses, + old_peer, + .. + } => { + debug!("{peer} added to the routing table of {}", self.peer_id); + // There are two events here; we can only return one, so let's defer them to the outbox. + if let Some(peer_id) = old_peer { + if self.is_static(peer_id) { + self.outbox.push_back(Event::Removed(peer_id)) + } + } + self.outbox + .push_back(Event::Added(peer, addresses.into_vec())) + } + } + } + other => { + return Poll::Ready(other.map_out(|_| unreachable!("already handled"))); + } + } + } + + Poll::Pending + } +} diff --git a/ipld-resolver/src/behaviour/membership.rs b/ipld-resolver/src/behaviour/membership.rs new file mode 100644 index 000000000..0923459a1 --- /dev/null +++ b/ipld-resolver/src/behaviour/membership.rs @@ -0,0 +1,602 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +use std::collections::{HashMap, HashSet, VecDeque}; +use std::task::{Context, Poll}; +use std::time::Duration; + +use anyhow::anyhow; +use ipc_sdk::subnet_id::SubnetID; +use libp2p::core::connection::ConnectionId; +use libp2p::gossipsub::error::SubscriptionError; +use libp2p::gossipsub::{ + GossipsubConfigBuilder, GossipsubEvent, GossipsubMessage, IdentTopic, MessageAuthenticity, + MessageId, Sha256Topic, Topic, TopicHash, +}; +use libp2p::identity::Keypair; +use libp2p::swarm::derive_prelude::FromSwarm; +use libp2p::swarm::{NetworkBehaviourAction, PollParameters}; +use libp2p::Multiaddr; +use libp2p::{ + gossipsub::Gossipsub, + swarm::{ConnectionHandler, IntoConnectionHandler, NetworkBehaviour}, + PeerId, +}; +use log::{debug, error, warn}; +use tokio::time::{Instant, Interval}; + +use crate::hash::blake2b_256; +use crate::provider_cache::{ProviderDelta, SubnetProviderCache}; +use crate::provider_record::{ProviderRecord, SignedProviderRecord}; +use crate::vote_record::{SignedVoteRecord, VoteRecord}; +use crate::{stats, Timestamp}; + +use super::NetworkConfig; + +/// `Gossipsub` topic identifier for subnet membership. +const PUBSUB_MEMBERSHIP: &str = "/ipc/membership"; +/// `Gossipsub` topic identifier for voting about content. +const PUBSUB_VOTES: &str = "/ipc/ipld/votes"; +/// `Gossipsub` topic identifier for pre-emptively published blocks of data. +const PUBSUB_PREEMPTIVE: &str = "/ipc/ipld/pre-emptive"; + +/// Events emitted by the [`membership::Behaviour`] behaviour. +#[derive(Debug)] +pub enum Event { + /// Indicate a change in the subnets a peer is known to support. + Updated(PeerId, ProviderDelta), + + /// Indicate that we no longer treat a peer as routable and removed all their supported subnet associations. + Removed(PeerId), + + /// We could not add a provider record to the cache because the chache hasn't + /// been told yet that the provider peer is routable. This event can be used + /// to trigger a lookup by the discovery module to learn the address. + Skipped(PeerId), + + /// We received a [`VoteRecord`] in one of the subnets we are providing data for. + ReceivedVote(Box), + + /// We received preemptive data published in a subnet we were interested in. + ReceivedPreemptive(SubnetID, Vec), +} + +/// Configuration for [`membership::Behaviour`]. +#[derive(Clone, Debug)] +pub struct Config { + /// User defined list of subnets which will never be pruned from the cache. + pub static_subnets: Vec, + /// Maximum number of subnets to track in the cache. + pub max_subnets: usize, + /// Publish interval for supported subnets. + pub publish_interval: Duration, + /// Minimum time between publishing own provider record in reaction to new joiners. + pub min_time_between_publish: Duration, + /// Maximum age of provider records before the peer is removed without an update. + pub max_provider_age: Duration, +} + +#[derive(thiserror::Error, Debug)] +pub enum ConfigError { + #[error("invalid network: {0}")] + InvalidNetwork(String), + #[error("invalid gossipsub config: {0}")] + InvalidGossipsubConfig(String), + #[error("error subscribing to topic")] + Subscription(#[from] SubscriptionError), +} + +/// A [`NetworkBehaviour`] internally using [`Gossipsub`] to learn which +/// peer is able to resolve CIDs in different subnets. +pub struct Behaviour { + /// [`Gossipsub`] behaviour to spread the information about subnet membership. + inner: Gossipsub, + /// Events to return when polled. + outbox: VecDeque, + /// [`Keypair`] used to sign [`SignedProviderRecord`] instances. + local_key: Keypair, + /// Name of the P2P network, used to separate `Gossipsub` topics. + network_name: String, + /// Name of the [`Gossipsub`] topic where subnet memberships are published. + membership_topic: IdentTopic, + /// List of subnet IDs this agent is providing data for. + subnet_ids: Vec, + /// Voting topics we are currently subscribed to. + voting_topics: HashSet, + /// Remember which subnet a topic was about. + preemptive_topics: HashMap, + /// Caching the latest state of subnet providers. + provider_cache: SubnetProviderCache, + /// Interval between publishing the currently supported subnets. + /// + /// This acts like a heartbeat; if a peer doesn't publish its snapshot for a long time, + /// other agents can prune it from their cache and not try to contact for resolution. + publish_interval: Interval, + /// Minimum time between publishing own provider record in reaction to new joiners. + min_time_between_publish: Duration, + /// Last time we gossiped our own provider record. + last_publish_timestamp: Timestamp, + /// Next time we will gossip our own provider record. + next_publish_timestamp: Timestamp, + /// Maximum time a provider can be without an update before it's pruned from the cache. + max_provider_age: Duration, +} + +impl Behaviour { + pub fn new(nc: NetworkConfig, mc: Config) -> Result { + if nc.network_name.is_empty() { + return Err(ConfigError::InvalidNetwork(nc.network_name)); + } + let membership_topic = Topic::new(format!("{}/{}", PUBSUB_MEMBERSHIP, nc.network_name)); + + let mut gossipsub_config = GossipsubConfigBuilder::default(); + // Set the maximum message size to 2MB. + gossipsub_config.max_transmit_size(2 << 20); + gossipsub_config.message_id_fn(|msg: &GossipsubMessage| { + let s = blake2b_256(&msg.data); + MessageId::from(s) + }); + + let gossipsub_config = gossipsub_config + .build() + .map_err(|s| ConfigError::InvalidGossipsubConfig(s.into()))?; + + let mut gossipsub = Gossipsub::new( + MessageAuthenticity::Signed(nc.local_key.clone()), + gossipsub_config, + ) + .map_err(|s| ConfigError::InvalidGossipsubConfig(s.into()))?; + + gossipsub + .with_peer_score( + scoring::build_peer_score_params(membership_topic.clone()), + scoring::build_peer_score_thresholds(), + ) + .map_err(ConfigError::InvalidGossipsubConfig)?; + + // Subscribe to the topic. + gossipsub.subscribe(&membership_topic)?; + + // Don't publish immediately, it's empty. Let the creator call `set_subnet_ids` to trigger initially. + let mut interval = tokio::time::interval(mc.publish_interval); + interval.reset(); + + // Not passing static subnets here; using pinning below instead so it subscribes as well + let provider_cache = SubnetProviderCache::new(mc.max_subnets, vec![]); + + let mut membership = Self { + inner: gossipsub, + outbox: Default::default(), + local_key: nc.local_key, + network_name: nc.network_name, + membership_topic, + subnet_ids: Default::default(), + voting_topics: Default::default(), + preemptive_topics: Default::default(), + provider_cache, + publish_interval: interval, + min_time_between_publish: mc.min_time_between_publish, + last_publish_timestamp: Timestamp::default(), + next_publish_timestamp: Timestamp::now() + mc.publish_interval, + max_provider_age: mc.max_provider_age, + }; + + for subnet_id in mc.static_subnets { + membership.pin_subnet(subnet_id)?; + } + + Ok(membership) + } + + /// Construct the topic used to gossip about pre-emptively published data. + /// + /// Replaces "/" with "_" to avoid clashes from prefix/suffix overlap. + fn preemptive_topic(&self, subnet_id: &SubnetID) -> Sha256Topic { + Topic::new(format!( + "{}/{}/{}", + PUBSUB_PREEMPTIVE, + self.network_name.replace('/', "_"), + subnet_id.to_string().replace('/', "_") + )) + } + + /// Subscribe to a preemptive topic. + fn preemptive_subscribe(&mut self, subnet_id: SubnetID) -> Result<(), SubscriptionError> { + let topic = self.preemptive_topic(&subnet_id); + self.inner.subscribe(&topic)?; + self.preemptive_topics.insert(topic.hash(), subnet_id); + Ok(()) + } + + /// Subscribe to a preemptive topic. + fn preemptive_unsubscribe(&mut self, subnet_id: &SubnetID) -> anyhow::Result<()> { + let topic = self.preemptive_topic(subnet_id); + self.inner.unsubscribe(&topic)?; + self.preemptive_topics.remove(&topic.hash()); + Ok(()) + } + + /// Construct the topic used to gossip about votes. + /// + /// Replaces "/" with "_" to avoid clashes from prefix/suffix overlap. + fn voting_topic(&self, subnet_id: &SubnetID) -> Sha256Topic { + Topic::new(format!( + "{}/{}/{}", + PUBSUB_VOTES, + self.network_name.replace('/', "_"), + subnet_id.to_string().replace('/', "_") + )) + } + + /// Subscribe to a voting topic. + fn voting_subscribe(&mut self, subnet_id: &SubnetID) -> Result<(), SubscriptionError> { + let topic = self.voting_topic(subnet_id); + self.inner.subscribe(&topic)?; + self.voting_topics.insert(topic.hash()); + Ok(()) + } + + /// Unsubscribe from a voting topic. + fn voting_unsubscribe(&mut self, subnet_id: &SubnetID) -> anyhow::Result<()> { + let topic = self.voting_topic(subnet_id); + self.inner.unsubscribe(&topic)?; + self.voting_topics.remove(&topic.hash()); + Ok(()) + } + + /// Set all the currently supported subnet IDs, then publish the updated list. + pub fn set_provided_subnets(&mut self, subnet_ids: Vec) -> anyhow::Result<()> { + let old_subnet_ids = std::mem::take(&mut self.subnet_ids); + // Unsubscribe from removed. + for subnet_id in old_subnet_ids.iter() { + if !subnet_ids.contains(subnet_id) { + self.voting_unsubscribe(subnet_id)?; + } + } + // Subscribe to added. + for subnet_id in subnet_ids.iter() { + if !old_subnet_ids.contains(subnet_id) { + self.voting_subscribe(subnet_id)?; + } + } + self.subnet_ids = subnet_ids; + self.publish_membership() + } + + /// Add a subnet to the list of supported subnets, then publish the updated list. + pub fn add_provided_subnet(&mut self, subnet_id: SubnetID) -> anyhow::Result<()> { + if self.subnet_ids.contains(&subnet_id) { + return Ok(()); + } + self.voting_subscribe(&subnet_id)?; + self.subnet_ids.push(subnet_id); + self.publish_membership() + } + + /// Remove a subnet from the list of supported subnets, then publish the updated list. + pub fn remove_provided_subnet(&mut self, subnet_id: SubnetID) -> anyhow::Result<()> { + if !self.subnet_ids.contains(&subnet_id) { + return Ok(()); + } + self.voting_unsubscribe(&subnet_id)?; + self.subnet_ids.retain(|id| id != &subnet_id); + self.publish_membership() + } + + /// Make sure a subnet is not pruned, so we always track its providers. + /// Also subscribe to pre-emptively published blocks of data. + /// + /// This method could be called in a parent subnet when the ledger indicates + /// there is a known child subnet, so we make sure this subnet cannot be + /// crowded out during the initial phase of bootstrapping the network. + pub fn pin_subnet(&mut self, subnet_id: SubnetID) -> Result<(), SubscriptionError> { + self.preemptive_subscribe(subnet_id.clone())?; + self.provider_cache.pin_subnet(subnet_id); + Ok(()) + } + + /// Make a subnet pruneable and unsubscribe from pre-emptive data. + pub fn unpin_subnet(&mut self, subnet_id: &SubnetID) -> anyhow::Result<()> { + self.preemptive_unsubscribe(subnet_id)?; + self.provider_cache.unpin_subnet(subnet_id); + Ok(()) + } + + /// Send a message through Gossipsub to let everyone know about the current configuration. + fn publish_membership(&mut self) -> anyhow::Result<()> { + let record = ProviderRecord::signed(&self.local_key, self.subnet_ids.clone())?; + let data = record.into_envelope().into_protobuf_encoding(); + match self.inner.publish(self.membership_topic.clone(), data) { + Err(e) => { + stats::MEMBERSHIP_PUBLISH_FAILURE.inc(); + Err(anyhow!(e)) + } + Ok(_msg_id) => { + stats::MEMBERSHIP_PUBLISH_SUCCESS.inc(); + self.last_publish_timestamp = Timestamp::now(); + self.next_publish_timestamp = + self.last_publish_timestamp + self.publish_interval.period(); + self.publish_interval.reset(); // In case the change wasn't tiggered by the schedule. + Ok(()) + } + } + } + + /// Publish the vote of the validator running the agent about a CID to a subnet. + pub fn publish_vote(&mut self, vote: SignedVoteRecord) -> anyhow::Result<()> { + let topic = self.voting_topic(&vote.record().subnet_id); + let data = vote.into_envelope().into_protobuf_encoding(); + match self.inner.publish(topic, data) { + Err(e) => { + stats::MEMBERSHIP_PUBLISH_FAILURE.inc(); + Err(anyhow!(e)) + } + Ok(_msg_id) => { + stats::MEMBERSHIP_PUBLISH_SUCCESS.inc(); + Ok(()) + } + } + } + + /// Publish arbitrary data to the pre-emptive topic of a subnet. + /// + /// We are not expected to be subscribed to this topic, only agents on the parent subnet are. + pub fn publish_preemptive(&mut self, subnet_id: SubnetID, data: Vec) -> anyhow::Result<()> { + let topic = self.preemptive_topic(&subnet_id); + match self.inner.publish(topic, data) { + Err(e) => { + stats::MEMBERSHIP_PUBLISH_FAILURE.inc(); + Err(anyhow!(e)) + } + Ok(_msg_id) => { + stats::MEMBERSHIP_PUBLISH_SUCCESS.inc(); + Ok(()) + } + } + } + + /// Mark a peer as routable in the cache. + /// + /// Call this method when the discovery service learns the address of a peer. + pub fn set_routable(&mut self, peer_id: PeerId) { + self.provider_cache.set_routable(peer_id); + stats::MEMBERSHIP_ROUTABLE_PEERS + .set(self.provider_cache.num_routable().try_into().unwrap()); + self.publish_for_new_peer(peer_id); + } + + /// Mark a peer as unroutable in the cache. + /// + /// Call this method when the discovery service forgets the address of a peer. + pub fn set_unroutable(&mut self, peer_id: PeerId) { + self.provider_cache.set_unroutable(peer_id); + self.outbox.push_back(Event::Removed(peer_id)) + } + + /// List the current providers of a subnet. + /// + /// Call this method when looking for a peer to resolve content from. + pub fn providers_of_subnet(&self, subnet_id: &SubnetID) -> Vec { + self.provider_cache.providers_of_subnet(subnet_id) + } + + /// Parse and handle a [`GossipsubMessage`]. If it's from the expected topic, + /// then raise domain event to let the rest of the application know about a + /// provider. Also update all the book keeping in the behaviour that we use + /// to answer future queries about the topic. + fn handle_message(&mut self, msg: GossipsubMessage) { + if msg.topic == self.membership_topic.hash() { + match SignedProviderRecord::from_bytes(&msg.data).map(|r| r.into_record()) { + Ok(record) => self.handle_provider_record(record), + Err(e) => { + stats::MEMBERSHIP_INVALID_MESSAGE.inc(); + warn!( + "Gossip message from peer {:?} could not be deserialized as ProviderRecord: {e}", + msg.source + ); + } + } + } else if self.voting_topics.contains(&msg.topic) { + match SignedVoteRecord::from_bytes(&msg.data).map(|r| r.into_record()) { + Ok(record) => self.handle_vote_record(record), + Err(e) => { + stats::MEMBERSHIP_INVALID_MESSAGE.inc(); + warn!( + "Gossip message from peer {:?} could not be deserialized as VoteRecord: {e}", + msg.source + ); + } + } + } else if let Some(subnet_id) = self.preemptive_topics.get(&msg.topic) { + self.handle_preemptive_data(subnet_id.clone(), msg.data) + } else { + stats::MEMBERSHIP_UNKNOWN_TOPIC.inc(); + warn!( + "unknown gossipsub topic in message from {:?}: {}", + msg.source, msg.topic + ); + } + } + + /// Try to add a provider record to the cache. + /// + /// If this is the first time we receive a record from the peer, + /// reciprocate by publishing our own. + fn handle_provider_record(&mut self, record: ProviderRecord) { + let (event, publish) = match self.provider_cache.add_provider(&record) { + None => { + stats::MEMBERSHIP_SKIPPED_PEERS.inc(); + (Some(Event::Skipped(record.peer_id)), false) + } + Some(d) if d.is_empty() && !d.is_new => (None, false), + Some(d) => { + let publish = d.is_new; + (Some(Event::Updated(record.peer_id, d)), publish) + } + }; + + if let Some(event) = event { + self.outbox.push_back(event); + } + + if publish { + stats::MEMBERSHIP_PROVIDER_PEERS.inc(); + self.publish_for_new_peer(record.peer_id) + } + } + + /// Raise an event to tell we received a new vote. + fn handle_vote_record(&mut self, record: VoteRecord) { + self.outbox.push_back(Event::ReceivedVote(Box::new(record))) + } + + fn handle_preemptive_data(&mut self, subnet_id: SubnetID, data: Vec) { + self.outbox + .push_back(Event::ReceivedPreemptive(subnet_id, data)) + } + + /// Handle new subscribers to the membership topic. + fn handle_subscriber(&mut self, peer_id: PeerId, topic: TopicHash) { + if topic == self.membership_topic.hash() { + self.publish_for_new_peer(peer_id) + } + } + + /// Publish our provider record when we encounter a new peer, unless we have recently done so. + fn publish_for_new_peer(&mut self, peer_id: PeerId) { + if self.subnet_ids.is_empty() { + // We have nothing, so there's no need for them to know this ASAP. + // The reason we shouldn't disable periodic publishing of empty + // records completely is because it would also remove one of + // triggers for non-connected peers to eagerly publish their + // subnets when they see our empty records. Plus they could + // be good to show on metrics, to have a single source of + // the cluster size available on any node. + return; + } + let now = Timestamp::now(); + if self.last_publish_timestamp > now - self.min_time_between_publish { + debug!("recently published, not publishing again for peer {peer_id}"); + } else if self.next_publish_timestamp <= now + self.min_time_between_publish { + debug!("publishing soon for new peer {peer_id}"); // don't let new joiners delay it forever by hitting the next block + } else { + debug!("publishing for new peer {peer_id}"); + // Create a new timer, rather than publish and reset. This way we don't repeat error handling. + // Give some time for Kademlia and Identify to do their bit on both sides. Works better in tests. + let delayed = Instant::now() + self.min_time_between_publish; + self.next_publish_timestamp = now + self.min_time_between_publish; + self.publish_interval = + tokio::time::interval_at(delayed, self.publish_interval.period()) + } + } + + /// Remove any membership record that hasn't been updated for a long time. + fn prune_membership(&mut self) { + let cutoff_timestamp = Timestamp::now() - self.max_provider_age; + let pruned = self.provider_cache.prune_providers(cutoff_timestamp); + for peer_id in pruned { + stats::MEMBERSHIP_PROVIDER_PEERS.dec(); + self.outbox.push_back(Event::Removed(peer_id)) + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = ::ConnectionHandler; + type OutEvent = Event; + + fn new_handler(&mut self) -> Self::ConnectionHandler { + self.inner.new_handler() + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + self.inner.addresses_of_peer(peer_id) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + self.inner.on_swarm_event(event) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: <::Handler as ConnectionHandler>::OutEvent, + ) { + self.inner + .on_connection_handler_event(peer_id, connection_id, event) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> std::task::Poll> { + // Emit own events first. + if let Some(ev) = self.outbox.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); + } + + // Republish our current peer record snapshot and prune old records. + if self.publish_interval.poll_tick(cx).is_ready() { + if let Err(e) = self.publish_membership() { + warn!("failed to publish membership: {e}") + }; + self.prune_membership(); + } + + // Poll Gossipsub for events; this is where we can handle Gossipsub messages and + // store the associations from peers to subnets. + while let Poll::Ready(ev) = self.inner.poll(cx, params) { + match ev { + NetworkBehaviourAction::GenerateEvent(ev) => { + match ev { + // NOTE: We could (ab)use the Gossipsub mechanism itself to signal subnet membership, + // however I think the information would only spread to our nearest neighbours we are + // connected to. If we assume there are hundreds of agents in each subnet which may + // or may not overlap, and each agent is connected to ~50 other agents, then the chance + // that there are subnets from which there are no or just a few connections is not + // insignificant. For this reason I oped to use messages instead, and let the content + // carry the information, spreading through the Gossipsub network regardless of the + // number of connected peers. + GossipsubEvent::Subscribed { peer_id, topic } => { + self.handle_subscriber(peer_id, topic) + } + + GossipsubEvent::Unsubscribed { .. } => {} + // Log potential misconfiguration. + GossipsubEvent::GossipsubNotSupported { peer_id } => { + debug!("peer {peer_id} doesn't support gossipsub"); + } + GossipsubEvent::Message { message, .. } => { + self.handle_message(message); + } + } + } + other => { + return Poll::Ready(other.map_out(|_| unreachable!("already handled"))); + } + } + } + + Poll::Pending + } +} + +// Forest has Filecoin specific values copied from Lotus. Not sure what values to use, +// so I'll leave everything on default for now. Or maybe they should be left empty? +mod scoring { + + use libp2p::gossipsub::{IdentTopic, PeerScoreParams, PeerScoreThresholds, TopicScoreParams}; + + pub fn build_peer_score_params(membership_topic: IdentTopic) -> PeerScoreParams { + let mut params = PeerScoreParams::default(); + params + .topics + .insert(membership_topic.hash(), TopicScoreParams::default()); + params + } + + pub fn build_peer_score_thresholds() -> PeerScoreThresholds { + PeerScoreThresholds::default() + } +} diff --git a/ipld-resolver/src/behaviour/mod.rs b/ipld-resolver/src/behaviour/mod.rs new file mode 100644 index 000000000..a94339761 --- /dev/null +++ b/ipld-resolver/src/behaviour/mod.rs @@ -0,0 +1,100 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +use libipld::store::StoreParams; +use libp2p::{ + identify, + identity::{Keypair, PublicKey}, + ping, + swarm::NetworkBehaviour, + PeerId, +}; +use libp2p_bitswap::BitswapStore; + +pub mod content; +pub mod discovery; +pub mod membership; + +pub use content::Config as ContentConfig; +pub use discovery::Config as DiscoveryConfig; +pub use membership::Config as MembershipConfig; + +#[derive(Clone, Debug)] +pub struct NetworkConfig { + /// Cryptographic key used to sign messages. + pub local_key: Keypair, + /// Network name to be differentiate this peer group. + pub network_name: String, +} + +impl NetworkConfig { + pub fn local_public_key(&self) -> PublicKey { + self.local_key.public() + } + pub fn local_peer_id(&self) -> PeerId { + self.local_public_key().to_peer_id() + } +} + +#[derive(thiserror::Error, Debug)] +pub enum ConfigError { + #[error("Error in the discovery configuration")] + Discovery(#[from] discovery::ConfigError), + #[error("Error in the membership configuration")] + Membership(#[from] membership::ConfigError), +} + +/// Libp2p behaviour bundle to manage content resolution from other subnets, using: +/// +/// * Kademlia for peer discovery +/// * Gossipsub to advertise subnet membership +/// * Bitswap to resolve CIDs +#[derive(NetworkBehaviour)] +pub struct Behaviour { + ping: ping::Behaviour, + identify: identify::Behaviour, + discovery: discovery::Behaviour, + membership: membership::Behaviour, + content: content::Behaviour

, +} + +// Unfortunately by using `#[derive(NetworkBehaviour)]` we cannot easily inspects events +// from the inner behaviours, e.g. we cannot poll a behaviour and if it returns something +// of interest then call a method on another behaviour. We can do this in yet another wrapper +// where we manually implement `NetworkBehaviour`, or the outer service where we drive the +// Swarm; there we are free to call any of the behaviours as well as the Swarm. + +impl Behaviour

{ + pub fn new( + nc: NetworkConfig, + dc: DiscoveryConfig, + mc: MembershipConfig, + cc: ContentConfig, + store: S, + ) -> Result + where + S: BitswapStore, + { + Ok(Self { + ping: Default::default(), + identify: identify::Behaviour::new(identify::Config::new( + "ipfs/1.0.0".into(), + nc.local_public_key(), + )), + discovery: discovery::Behaviour::new(nc.clone(), dc)?, + membership: membership::Behaviour::new(nc, mc)?, + content: content::Behaviour::new(cc, store), + }) + } + + pub fn discovery_mut(&mut self) -> &mut discovery::Behaviour { + &mut self.discovery + } + + pub fn membership_mut(&mut self) -> &mut membership::Behaviour { + &mut self.membership + } + + pub fn content_mut(&mut self) -> &mut content::Behaviour

{ + &mut self.content + } +} diff --git a/ipld-resolver/src/client.rs b/ipld-resolver/src/client.rs new file mode 100644 index 000000000..39c9ec001 --- /dev/null +++ b/ipld-resolver/src/client.rs @@ -0,0 +1,97 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +use anyhow::anyhow; +use ipc_sdk::subnet_id::SubnetID; +use libipld::Cid; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot; + +use crate::{ + service::{Request, ResolveResult}, + vote_record::SignedVoteRecord, +}; + +/// A facade to the [`Service`] to provide a nicer interface than message passing would allow on its own. +#[derive(Clone)] +pub struct Client { + request_tx: UnboundedSender, +} + +impl Client { + pub(crate) fn new(request_tx: UnboundedSender) -> Self { + Self { request_tx } + } + + /// Send a request to the [`Service`], unless it has stopped listening. + fn send_request(&self, req: Request) -> anyhow::Result<()> { + self.request_tx + .send(req) + .map_err(|_| anyhow!("disconnected")) + } + + /// Set the complete list of subnets currently supported by this node. + pub fn set_provided_subnets(&self, subnet_ids: Vec) -> anyhow::Result<()> { + let req = Request::SetProvidedSubnets(subnet_ids); + self.send_request(req) + } + + /// Add a subnet supported by this node. + pub fn add_provided_subnet(&self, subnet_id: SubnetID) -> anyhow::Result<()> { + let req = Request::AddProvidedSubnet(subnet_id); + self.send_request(req) + } + + /// Remove a subnet no longer supported by this node. + pub fn remove_provided_subnet(&self, subnet_id: SubnetID) -> anyhow::Result<()> { + let req = Request::RemoveProvidedSubnet(subnet_id); + self.send_request(req) + } + + /// Add a subnet we know really exist and we are interested in them. + pub fn pin_subnet(&self, subnet_id: SubnetID) -> anyhow::Result<()> { + let req = Request::PinSubnet(subnet_id); + self.send_request(req) + } + + /// Unpin a we are no longer interested in. + pub fn unpin_subnet(&self, subnet_id: SubnetID) -> anyhow::Result<()> { + let req = Request::UnpinSubnet(subnet_id); + self.send_request(req) + } + + /// Send a CID for resolution from a specific subnet, await its completion, + /// then return the result, to be inspected by the caller. + /// + /// Upon success, the data should be found in the store. + pub async fn resolve(&self, cid: Cid, subnet_id: SubnetID) -> anyhow::Result { + let (tx, rx) = oneshot::channel(); + let req = Request::Resolve(cid, subnet_id, tx); + self.send_request(req)?; + let res = rx.await?; + Ok(res) + } + + /// Update the rate limit based on new projections for the same timeframe + /// the `content::Behaviour` was originally configured with. This can be + /// used if we can't come up with a good estimate for the amount of data + /// we have to serve from the subnets we participate in, but we can adjust + /// them on the fly based on what we observe on chain. + pub fn update_rate_limit(&self, bytes: u32) -> anyhow::Result<()> { + let req = Request::UpdateRateLimit(bytes); + self.send_request(req) + } + + /// Publish a signed vote into a topic based on its subnet. + pub fn publish_vote(&self, vote: SignedVoteRecord) -> anyhow::Result<()> { + let req = Request::PublishVote(Box::new(vote)); + self.send_request(req) + } + + /// Publish pre-emptively to a subnet that agents in the parent subnet + /// would be subscribed to if they are interested in receiving data + /// before they would have to use [`Client::resolve`] instead. + pub fn publish_preemptive(&self, subnet_id: SubnetID, data: Vec) -> anyhow::Result<()> { + let req = Request::PublishPreemptive(subnet_id, data); + self.send_request(req) + } +} diff --git a/ipld-resolver/src/hash.rs b/ipld-resolver/src/hash.rs new file mode 100644 index 000000000..7586ea4ea --- /dev/null +++ b/ipld-resolver/src/hash.rs @@ -0,0 +1,32 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +// Copyright 2019-2022 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use blake2b_simd::Params; + +/// Generates BLAKE2b hash of fixed 32 bytes size. +pub fn blake2b_256(ingest: &[u8]) -> [u8; 32] { + let digest = Params::new() + .hash_length(32) + .to_state() + .update(ingest) + .finalize(); + + let mut ret = [0u8; 32]; + ret.clone_from_slice(digest.as_bytes()); + ret +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn vector_hashing() { + let ing_vec = vec![1, 2, 3]; + + assert_eq!(blake2b_256(&ing_vec), blake2b_256(&[1, 2, 3])); + assert_ne!(blake2b_256(&ing_vec), blake2b_256(&[1, 2, 3, 4])); + } +} diff --git a/ipld-resolver/src/lib.rs b/ipld-resolver/src/lib.rs new file mode 100644 index 000000000..17331e47c --- /dev/null +++ b/ipld-resolver/src/lib.rs @@ -0,0 +1,26 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +mod behaviour; +mod client; +mod hash; +mod limiter; +mod service; +mod stats; +mod timestamp; + +mod provider_cache; +mod provider_record; +mod signed_record; +mod vote_record; + +#[cfg(any(test, feature = "arb"))] +mod arb; + +#[cfg(feature = "missing_blocks")] +pub mod missing_blocks; + +pub use behaviour::{ContentConfig, DiscoveryConfig, MembershipConfig, NetworkConfig}; +pub use client::Client; +pub use service::{Config, ConnectionConfig, Event, NoKnownPeers, Service}; +pub use timestamp::Timestamp; +pub use vote_record::VoteRecord; diff --git a/ipld-resolver/src/limiter.rs b/ipld-resolver/src/limiter.rs new file mode 100644 index 000000000..438aa10ee --- /dev/null +++ b/ipld-resolver/src/limiter.rs @@ -0,0 +1,89 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +use std::hash::Hash; +use std::time::{Duration, Instant}; + +use gcra::GcraState; +pub use gcra::RateLimit; +use libp2p::gossipsub::time_cache::TimeCache; + +/// Track the rate limit of resources (e.g. bytes) consumed per key. +/// +/// Forgets keys after long periods of inactivity. +pub struct RateLimiter { + // `TimeCache` uses `Instant::now()` internally. + // It's less testable than `gcra` which allows the time to be passed in, + // but it's only used for cleaning up, so it should be okay. + cache: TimeCache, +} + +impl RateLimiter +where + K: Eq + Hash + Clone, +{ + pub fn new(ttl: Duration) -> Self { + Self { + cache: TimeCache::new(ttl), + } + } + + /// Try to add a certain amount of resources consumed to a key. + /// + /// Return `true` if the key was within limits, `false` if it needs to wait. + /// + /// The [`RateLimit`] is passed in so that we can update it dynamically + /// based on how much data we anticipate we will have to serve. + pub fn add(&mut self, limit: &RateLimit, key: K, cost: u32) -> bool { + self.add_at(limit, key, cost, Instant::now()) + } + + /// Same as [`RateLimiter::add`] but allows passing in the time, for testing. + pub fn add_at(&mut self, limit: &RateLimit, key: K, cost: u32, at: Instant) -> bool { + #[allow(clippy::unwrap_or_default)] + let state = self.cache.entry(key).or_insert_with(GcraState::default); + + state.check_and_modify_at(limit, at, cost).is_ok() + } +} + +#[cfg(test)] +mod tests { + use std::time::{Duration, Instant}; + + use super::{RateLimit, RateLimiter}; + + #[test] + fn basics() { + // 10Mb per hour. + let one_hour = Duration::from_secs(60 * 60); + let rate_limit = RateLimit::new(10 * 1024 * 1024, one_hour); + let mut rate_limiter = RateLimiter::<&'static str>::new(one_hour); + + assert!(rate_limiter.add(&rate_limit, "foo", 1024)); + assert!(rate_limiter.add(&rate_limit, "foo", 5 * 1024 * 1024)); + assert!( + !rate_limiter.add(&rate_limit, "foo", 5 * 1024 * 1024), + "can't over consume" + ); + assert!( + rate_limiter.add(&rate_limit, "bar", 5 * 1024 * 1024), + "others can consume" + ); + + assert!( + rate_limiter.add_at( + &rate_limit, + "foo", + 5 * 1024 * 1024, + Instant::now() + one_hour + Duration::from_secs(1) + ), + "can consume again in the future" + ); + + let rate_limit = RateLimit::new(50 * 1024 * 1024, one_hour); + assert!( + rate_limiter.add(&rate_limit, "bar", 15 * 1024 * 1024), + "can raise quota" + ); + } +} diff --git a/ipld-resolver/src/missing_blocks.rs b/ipld-resolver/src/missing_blocks.rs new file mode 100644 index 000000000..d7dd1c93a --- /dev/null +++ b/ipld-resolver/src/missing_blocks.rs @@ -0,0 +1,33 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +// Copyright 2019-2022 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use fvm_ipld_blockstore::Blockstore; +use libipld::Cid; +use libipld::{prelude::*, store::StoreParams, Ipld}; + +/// Recursively find all [`Cid`] fields in the [`Block`] structures stored in the +/// [`Blockstore`] and return all CIDs which could *not* be retrieved from the store. +/// +/// This function is available as a convenience, to be used by any [`BitswapStore`] +/// implementation as they see fit. +pub fn missing_blocks( + bs: &mut BS, + cid: &Cid, +) -> anyhow::Result> +where + Ipld: References<

::Codecs>, +{ + let mut stack = vec![*cid]; + let mut missing = vec![]; + while let Some(cid) = stack.pop() { + if let Some(data) = bs.get(&cid)? { + let block = libipld::Block::

::new_unchecked(cid, data); + block.references(&mut stack)?; + } else { + missing.push(cid); + } + } + Ok(missing) +} diff --git a/ipld-resolver/src/provider_cache.rs b/ipld-resolver/src/provider_cache.rs new file mode 100644 index 000000000..1d36353a9 --- /dev/null +++ b/ipld-resolver/src/provider_cache.rs @@ -0,0 +1,388 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +use std::collections::{HashMap, HashSet}; + +use ipc_sdk::subnet_id::SubnetID; +use libp2p::PeerId; + +use crate::{provider_record::ProviderRecord, Timestamp}; + +/// Change in the supported subnets of a peer. +#[derive(Debug)] +pub struct ProviderDelta { + pub is_new: bool, + pub added: Vec, + pub removed: Vec, +} + +impl ProviderDelta { + pub fn is_empty(&self) -> bool { + self.added.is_empty() && self.removed.is_empty() + } +} + +/// Track which subnets are provided for by which set of peers. +pub struct SubnetProviderCache { + /// Maximum number of subnets to track, to protect against DoS attacks, trying to + /// flood someone with subnets that don't actually exist. When the number of subnets + /// reaches this value, we remove the subnet with the smallest number of providers; + /// hopefully this would be a subnet + max_subnets: usize, + /// User defined list of subnets which will never be pruned. This can be used to + /// ward off attacks that would prevent us from adding subnets we know we want to + /// support, and not rely on dynamic discovery of their peers. + pinned_subnets: HashSet, + /// Set of peers with known addresses. Only such peers can be added to the cache. + routable_peers: HashSet, + /// List of peer IDs supporting each subnet. + subnet_providers: HashMap>, + /// Timestamp of the last record received about a peer. + peer_timestamps: HashMap, +} + +impl SubnetProviderCache { + pub fn new(max_subnets: usize, static_subnets: Vec) -> Self { + Self { + pinned_subnets: HashSet::from_iter(static_subnets), + max_subnets, + routable_peers: Default::default(), + subnet_providers: Default::default(), + peer_timestamps: Default::default(), + } + } + + /// Pin a subnet, after which it won't be pruned. + pub fn pin_subnet(&mut self, subnet_id: SubnetID) { + self.pinned_subnets.insert(subnet_id); + } + + /// Unpin a subnet, which allows it to be pruned. + pub fn unpin_subnet(&mut self, subnet_id: &SubnetID) { + self.pinned_subnets.remove(subnet_id); + } + + /// Mark a peer as routable. + /// + /// Once routable, the cache will keep track of provided subnets. + pub fn set_routable(&mut self, peer_id: PeerId) { + self.routable_peers.insert(peer_id); + } + + /// Mark a previously routable peer as unroutable. + /// + /// Once unroutable, the cache will stop tracking the provided subnets. + pub fn set_unroutable(&mut self, peer_id: PeerId) { + self.routable_peers.remove(&peer_id); + self.peer_timestamps.remove(&peer_id); + for providers in self.subnet_providers.values_mut() { + providers.remove(&peer_id); + } + } + + /// Number of routable peers. + pub fn num_routable(&mut self) -> usize { + self.routable_peers.len() + } + + /// Check if a peer has been marked as routable. + pub fn is_routable(&self, peer_id: &PeerId) -> bool { + self.routable_peers.contains(peer_id) + } + + /// Check whether we have received recent updates from a peer. + pub fn has_timestamp(&self, peer_id: &PeerId) -> bool { + self.peer_timestamps.contains_key(peer_id) + } + + /// Try to add a provider to the cache. + /// + /// Returns `None` if the peer is not routable and nothing could be added. + /// + /// Returns `Some` if the peer is routable, containing the newly added + /// and newly removed associations for this peer. + pub fn add_provider(&mut self, record: &ProviderRecord) -> Option { + if !self.is_routable(&record.peer_id) { + return None; + } + + let mut delta = ProviderDelta { + is_new: !self.has_timestamp(&record.peer_id), + added: Vec::new(), + removed: Vec::new(), + }; + + let timestamp = self.peer_timestamps.entry(record.peer_id).or_default(); + + if *timestamp < record.timestamp { + *timestamp = record.timestamp; + + // The currently supported subnets of the peer. + let mut subnet_ids = HashSet::new(); + subnet_ids.extend(record.subnet_ids.iter()); + + // Remove the peer from subnets it no longer supports. + for (subnet_id, peer_ids) in self.subnet_providers.iter_mut() { + if !subnet_ids.contains(subnet_id) && peer_ids.remove(&record.peer_id) { + delta.removed.push(subnet_id.clone()); + } + } + + // Add peer to new subnets it supports now. + for subnet_id in record.subnet_ids.iter() { + let peer_ids = self.subnet_providers.entry(subnet_id.clone()).or_default(); + if peer_ids.insert(record.peer_id) { + delta.added.push(subnet_id.clone()); + } + } + + // Remove subnets that have been added but are too small to survive a pruning. + let removed_subnet_ids = self.prune_subnets(); + delta.added.retain(|id| !removed_subnet_ids.contains(id)) + } + + Some(delta) + } + + /// Ensure we don't have more than `max_subnets` number of subnets in the cache. + /// + /// Returns the removed subnet IDs. + fn prune_subnets(&mut self) -> HashSet { + let mut removed_subnet_ids = HashSet::new(); + + let to_prune = self.subnet_providers.len().saturating_sub(self.max_subnets); + if to_prune > 0 { + let mut counts = self + .subnet_providers + .iter() + .map(|(id, ps)| (id.clone(), ps.len())) + .collect::>(); + + counts.sort_by_key(|(_, count)| *count); + + for (subnet_id, _) in counts { + if self.pinned_subnets.contains(&subnet_id) { + continue; + } + self.subnet_providers.remove(&subnet_id); + removed_subnet_ids.insert(subnet_id); + if removed_subnet_ids.len() == to_prune { + break; + } + } + } + + removed_subnet_ids + } + + /// Prune any provider which hasn't provided an update since a cutoff timestamp. + /// + /// Returns the list of pruned peers. + pub fn prune_providers(&mut self, cutoff_timestamp: Timestamp) -> Vec { + let to_prune = self + .peer_timestamps + .iter() + .filter_map(|(id, ts)| { + if *ts < cutoff_timestamp { + Some(*id) + } else { + None + } + }) + .collect::>(); + + for peer_id in to_prune.iter() { + self.set_unroutable(*peer_id); + } + + to_prune + } + + /// List any known providers of a subnet. + pub fn providers_of_subnet(&self, subnet_id: &SubnetID) -> Vec { + self.subnet_providers + .get(subnet_id) + .map(|hs| hs.iter().cloned().collect()) + .unwrap_or_default() + } +} + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + + use ipc_sdk::subnet_id::SubnetID; + use libp2p::{identity::Keypair, PeerId}; + use quickcheck::Arbitrary; + use quickcheck_macros::quickcheck; + + use crate::{arb::ArbSubnetID, provider_record::ProviderRecord, Timestamp}; + + use super::SubnetProviderCache; + + #[derive(Debug, Clone)] + struct TestRecords(Vec); + + // Limited number of records from a limited set of peers. + impl Arbitrary for TestRecords { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + let rc = usize::arbitrary(g) % 20; + let pc = 1 + rc / 2; + + let mut ps = Vec::new(); + let mut rs = Vec::new(); + + for _ in 0..pc { + let pk = Keypair::generate_ed25519(); + let peer_id = pk.public().to_peer_id(); + ps.push(peer_id) + } + + for _ in 0..rc { + let peer_id = ps[usize::arbitrary(g) % ps.len()]; + let mut subnet_ids = Vec::new(); + for _ in 0..usize::arbitrary(g) % 5 { + subnet_ids.push(ArbSubnetID::arbitrary(g).0) + } + let record = ProviderRecord { + peer_id, + subnet_ids, + timestamp: Timestamp::arbitrary(g), + }; + rs.push(record) + } + + Self(rs) + } + } + + type Providers = HashMap>; + + /// Build a provider mapping to check the cache against. + fn build_providers(records: &Vec) -> Providers { + // Only the last timestamp should be kept, but it might not be unique. + let mut max_timestamps: HashMap = Default::default(); + for record in records { + let mts = max_timestamps.entry(record.peer_id).or_default(); + if *mts < record.timestamp { + *mts = record.timestamp; + } + } + + let mut providers: HashMap> = Default::default(); + let mut seen: HashSet = Default::default(); + + for record in records { + if record.timestamp != max_timestamps[&record.peer_id] { + continue; + } + if !seen.insert(record.peer_id) { + continue; + } + for subnet_id in record.subnet_ids.iter() { + providers + .entry(subnet_id.clone()) + .or_default() + .insert(record.peer_id); + } + } + + providers + } + + /// Check the cache against the reference built in the test. + fn check_providers(providers: &Providers, cache: &SubnetProviderCache) -> Result<(), String> { + for (subnet_id, exp_peer_ids) in providers { + let peer_ids = cache.providers_of_subnet(subnet_id); + if peer_ids.len() != exp_peer_ids.len() { + return Err(format!( + "expected {} peers, got {} in subnet {:?}", + exp_peer_ids.len(), + peer_ids.len(), + subnet_id + )); + } + for peer_id in peer_ids { + if !exp_peer_ids.contains(&peer_id) { + return Err("wrong peer ID".into()); + } + } + } + Ok(()) + } + + #[quickcheck] + fn prop_subnets_pruned(records: TestRecords, max_subnets: usize) -> bool { + let max_subnets = max_subnets % 10; + let mut cache = SubnetProviderCache::new(max_subnets, Vec::new()); + for record in records.0 { + cache.set_routable(record.peer_id); + if cache.add_provider(&record).is_none() { + return false; + } + } + cache.subnet_providers.len() <= max_subnets + } + + #[quickcheck] + fn prop_subnets_pinned(records: TestRecords) -> Result<(), String> { + // Find two subnets to pin. + let providers = build_providers(&records.0); + if providers.len() < 2 { + return Ok(()); + } + + let subnets = providers.keys().take(2).collect::>(); + + let mut cache = SubnetProviderCache::new(3, vec![subnets[0].clone()]); + cache.pin_subnet(subnets[1].clone()); + + for record in records.0 { + cache.set_routable(record.peer_id); + cache.add_provider(&record); + } + + if !cache.subnet_providers.contains_key(subnets[0]) { + return Err("static subnet not found".into()); + } + if !cache.subnet_providers.contains_key(subnets[1]) { + return Err("pinned subnet not found".into()); + } + Ok(()) + } + + #[quickcheck] + fn prop_providers_listed(records: TestRecords) -> Result<(), String> { + let records = records.0; + let mut cache = SubnetProviderCache::new(usize::MAX, Vec::new()); + + for record in records.iter() { + cache.set_routable(record.peer_id); + cache.add_provider(record); + } + + let providers = build_providers(&records); + + check_providers(&providers, &cache) + } + + #[quickcheck] + fn prop_providers_pruned( + records: TestRecords, + cutoff_timestamp: Timestamp, + ) -> Result<(), String> { + let mut records = records.0; + let mut cache = SubnetProviderCache::new(usize::MAX, Vec::new()); + for record in records.iter() { + cache.set_routable(record.peer_id); + cache.add_provider(record); + } + cache.prune_providers(cutoff_timestamp); + + // Build a reference from only what has come after the cutoff timestamp. + records.retain(|r| r.timestamp >= cutoff_timestamp); + + let providers = build_providers(&records); + + check_providers(&providers, &cache) + } +} diff --git a/ipld-resolver/src/provider_record.rs b/ipld-resolver/src/provider_record.rs new file mode 100644 index 000000000..61826f656 --- /dev/null +++ b/ipld-resolver/src/provider_record.rs @@ -0,0 +1,125 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT + +use ipc_sdk::subnet_id::SubnetID; +use libp2p::identity::Keypair; +use libp2p::PeerId; +use serde::{Deserialize, Serialize}; + +use crate::{ + signed_record::{Record, SignedRecord}, + Timestamp, +}; + +/// Record of the ability to provide data from a list of subnets. +/// +/// Note that each the record contains the snapshot of the currently provided +/// subnets, not a delta. This means that if there were two peers using the +/// same keys running on different addresses, e.g. if the same operator ran +/// something supporting subnet A on one address, and another process supporting +/// subnet B on a different address, these would override each other, unless +/// they have different public keys (and thus peer IDs) associated with them. +/// +/// This should be okay, as in practice there is no significance to these +/// peer IDs, we can even generate a fresh key-pair every time we run the +/// resolver. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct ProviderRecord { + /// The ID of the peer we can contact to pull data from. + pub peer_id: PeerId, + /// The IDs of the subnets they are participating in. + pub subnet_ids: Vec, + /// Timestamp from when the peer published this record. + /// + /// We use a timestamp instead of just a nonce so that we + /// can drop records which are too old, indicating that + /// the peer has dropped off. + pub timestamp: Timestamp, +} + +impl Record for ProviderRecord { + fn payload_type() -> &'static str { + "/ipc/provider-record" + } + + fn check_signing_key(&self, key: &libp2p::identity::PublicKey) -> bool { + self.peer_id == key.to_peer_id() + } +} + +pub type SignedProviderRecord = SignedRecord; + +impl ProviderRecord { + /// Create a new [`SignedProviderRecord`] with the current timestamp + /// and a signed envelope which can be shared with others. + pub fn signed( + key: &Keypair, + subnet_ids: Vec, + ) -> anyhow::Result { + let timestamp = Timestamp::now(); + let peer_id = key.public().to_peer_id(); + let record = ProviderRecord { + peer_id, + subnet_ids, + timestamp, + }; + let signed = SignedRecord::new(key, record)?; + Ok(signed) + } +} + +#[cfg(any(test, feature = "arb"))] +mod arb { + use libp2p::identity::Keypair; + use quickcheck::Arbitrary; + + use crate::arb::ArbSubnetID; + + use super::{ProviderRecord, SignedProviderRecord}; + + /// Create a valid [`SignedProviderRecord`] with a random key. + impl Arbitrary for SignedProviderRecord { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + // NOTE: Unfortunately the keys themselves are not deterministic, nor is the Timestamp. + let key = match u8::arbitrary(g) % 2 { + 0 => Keypair::generate_ed25519(), + _ => Keypair::generate_secp256k1(), + }; + + // Limit the number of subnets and the depth of keys so data generation doesn't take too long. + let mut subnet_ids = Vec::new(); + for _ in 0..u8::arbitrary(g) % 5 { + let subnet_id = ArbSubnetID::arbitrary(g); + subnet_ids.push(subnet_id.0) + } + + ProviderRecord::signed(&key, subnet_ids).expect("error creating signed envelope") + } + } +} + +#[cfg(test)] +mod tests { + use libp2p::core::SignedEnvelope; + use quickcheck_macros::quickcheck; + + use super::SignedProviderRecord; + + #[quickcheck] + fn prop_roundtrip(signed_record: SignedProviderRecord) -> bool { + crate::signed_record::tests::prop_roundtrip(signed_record) + } + + #[quickcheck] + fn prop_tamper_proof(signed_record: SignedProviderRecord, idx: usize) -> bool { + let mut envelope_bytes = signed_record.into_envelope().into_protobuf_encoding(); + // Do some kind of mutation to a random byte in the envelope; after that it should not validate. + let idx = idx % envelope_bytes.len(); + envelope_bytes[idx] = u8::MAX - envelope_bytes[idx]; + + match SignedEnvelope::from_protobuf_encoding(&envelope_bytes) { + Err(_) => true, // Corrupted the protobuf itself. + Ok(envelope) => SignedProviderRecord::from_signed_envelope(envelope).is_err(), + } + } +} diff --git a/ipld-resolver/src/service.rs b/ipld-resolver/src/service.rs new file mode 100644 index 000000000..a88890f2a --- /dev/null +++ b/ipld-resolver/src/service.rs @@ -0,0 +1,585 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +use std::collections::HashMap; +use std::time::Duration; + +use anyhow::anyhow; +use bloom::{BloomFilter, ASMS}; +use ipc_sdk::subnet_id::SubnetID; +use libipld::store::StoreParams; +use libipld::Cid; +use libp2p::futures::StreamExt; +use libp2p::swarm::SwarmEvent; +use libp2p::{ + core::{muxing::StreamMuxerBox, transport::Boxed}, + identity::Keypair, + mplex, noise, + swarm::{ConnectionLimits, SwarmBuilder}, + yamux, Multiaddr, PeerId, Swarm, Transport, +}; +use libp2p::{identify, ping}; +use libp2p_bitswap::{BitswapResponse, BitswapStore}; +use log::{debug, error, info, trace, warn}; +use prometheus::Registry; +use rand::seq::SliceRandom; +use tokio::select; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::sync::oneshot::{self, Sender}; + +use crate::behaviour::{ + self, content, discovery, membership, Behaviour, BehaviourEvent, ConfigError, ContentConfig, + DiscoveryConfig, MembershipConfig, NetworkConfig, +}; +use crate::client::Client; +use crate::stats; +use crate::vote_record::{SignedVoteRecord, VoteRecord}; + +/// Result of attempting to resolve a CID. +pub type ResolveResult = anyhow::Result<()>; + +/// Channel to complete the results with. +type ResponseChannel = oneshot::Sender; + +/// State of a query. The fallback peers can be used +/// if the current attempt fails. +struct Query { + cid: Cid, + subnet_id: SubnetID, + fallback_peer_ids: Vec, + response_channel: ResponseChannel, +} + +/// Keeps track of where to send query responses to. +type QueryMap = HashMap; + +/// Error returned when we tried to get a CID from a subnet for +/// which we currently have no peers to contact +#[derive(thiserror::Error, Debug)] +#[error("No known peers for subnet {0}")] +pub struct NoKnownPeers(SubnetID); + +#[derive(Debug, Clone)] +pub struct ConnectionConfig { + /// The address where we will listen to incoming connections. + pub listen_addr: Multiaddr, + /// Maximum number of incoming connections. + pub max_incoming: u32, + /// Expected number of peers, for sizing the Bloom filter. + pub expected_peer_count: u32, + /// Maximum number of peers to send Bitswap requests to in a single attempt. + pub max_peers_per_query: u32, + /// Maximum number of events in the push-based broadcast channel before a slow + /// consumer gets an error because it's falling behind. + pub event_buffer_capacity: u32, +} + +#[derive(Debug, Clone)] +pub struct Config { + pub network: NetworkConfig, + pub discovery: DiscoveryConfig, + pub membership: MembershipConfig, + pub connection: ConnectionConfig, + pub content: ContentConfig, +} + +/// Internal requests to enqueue to the [`Service`] +pub(crate) enum Request { + SetProvidedSubnets(Vec), + AddProvidedSubnet(SubnetID), + RemoveProvidedSubnet(SubnetID), + PublishVote(Box), + PublishPreemptive(SubnetID, Vec), + PinSubnet(SubnetID), + UnpinSubnet(SubnetID), + Resolve(Cid, SubnetID, ResponseChannel), + RateLimitUsed(PeerId, usize), + UpdateRateLimit(u32), +} + +/// Events that arise from the subnets, pushed to the clients, +/// not part of a request-response action. +#[derive(Clone, Debug)] +pub enum Event { + /// Received a vote about in a subnet about a CID. + ReceivedVote(Box), + /// Received raw pre-emptive data published to a pinned subnet. + ReceivedPreemptive(SubnetID, Vec), +} + +/// The `Service` handles P2P communication to resolve IPLD content by wrapping and driving a number of `libp2p` behaviours. +pub struct Service { + peer_id: PeerId, + listen_addr: Multiaddr, + swarm: Swarm>, + /// To match finished queries to response channels. + queries: QueryMap, + /// For receiving requests from the clients and self. + request_rx: mpsc::UnboundedReceiver, + /// For creating new clients and sending messages to self. + request_tx: mpsc::UnboundedSender, + /// For broadcasting events to all clients. + event_tx: broadcast::Sender, + /// To avoid looking up the same peer over and over. + background_lookup_filter: BloomFilter, + /// To limit the number of peers contacted in a Bitswap resolution attempt. + max_peers_per_query: usize, +} + +impl Service

{ + /// Build a [`Service`] and a [`Client`] with the default `tokio` transport. + pub fn new(config: Config, store: S) -> Result + where + S: BitswapStore, + { + Self::new_with_transport(config, store, build_transport) + } + + /// Build a [`Service`] and a [`Client`] by passing in a transport factory function. + /// + /// The main goal is to be facilitate testing with a [`MemoryTransport`]. + pub fn new_with_transport( + config: Config, + store: S, + transport: F, + ) -> Result + where + S: BitswapStore, + F: FnOnce(Keypair) -> Boxed<(PeerId, StreamMuxerBox)>, + { + let peer_id = config.network.local_peer_id(); + let transport = transport(config.network.local_key.clone()); + let behaviour = Behaviour::new( + config.network, + config.discovery, + config.membership, + config.content, + store, + )?; + + // NOTE: Hardcoded values from Forest. Will leave them as is until we know we need to change. + + let limits = ConnectionLimits::default() + .with_max_pending_incoming(Some(10)) + .with_max_pending_outgoing(Some(30)) + .with_max_established_incoming(Some(config.connection.max_incoming)) + .with_max_established_outgoing(None) // Allow bitswap to connect to subnets we did not anticipate when we started. + .with_max_established_per_peer(Some(5)); + + let swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, peer_id) + .connection_limits(limits) + .notify_handler_buffer_size(std::num::NonZeroUsize::new(20).expect("Not zero")) + .connection_event_buffer_size(64) + .build(); + + let (request_tx, request_rx) = mpsc::unbounded_channel(); + let (event_tx, _) = broadcast::channel(config.connection.event_buffer_capacity as usize); + + let service = Self { + peer_id, + listen_addr: config.connection.listen_addr, + swarm, + queries: Default::default(), + request_rx, + request_tx, + event_tx, + background_lookup_filter: BloomFilter::with_rate( + 0.1, + config.connection.expected_peer_count, + ), + max_peers_per_query: config.connection.max_peers_per_query as usize, + }; + + Ok(service) + } + + /// Create a new [`Client`] instance bound to this `Service`. + /// + /// The [`Client`] is geared towards request-response interactions, + /// while the `Receiver` returned by `subscribe` is used for events + /// which weren't initiated by the `Client`. + pub fn client(&self) -> Client { + Client::new(self.request_tx.clone()) + } + + /// Create a new [`broadcast::Receiver`] instance bound to this `Service`, + /// which will be notified upon each event coming from any of the subnets + /// the `Service` is subscribed to. + /// + /// The consumers are expected to process events quick enough to be within + /// the configured capacity of the broadcast channel, or otherwise be able + /// to deal with message loss if they fall behind. + /// + /// # Notes + /// + /// This is not part of the [`Client`] because `Receiver::recv` takes + /// a mutable reference and it would prevent the [`Client`] being used + /// for anything else. + /// + /// One alternative design would be to accept an interface similar to + /// [`BitswapStore`] that we can pass events to. In that case we would + /// have to create an internal event queue to stand in front of it, + /// and because these events arrive from the outside, it would still + /// have to have limited capacity. + /// + /// Because the channel has limited capacity, we have to take care not + /// to use it for signaling critical events that we want to await upon. + /// For example if we used this to signal the readiness of bootstrapping, + /// we should make sure we have not yet subscribed to external events + /// which could drown it out. + /// + /// One way to achieve this is for the consumer of the events to redistribute + /// them into priorities event queues, some bounded, some unbounded. + pub fn subscribe(&self) -> broadcast::Receiver { + self.event_tx.subscribe() + } + + /// Register Prometheus metrics. + pub fn register_metrics(&mut self, registry: &Registry) -> anyhow::Result<()> { + self.content_mut().register_metrics(registry)?; + stats::register_metrics(registry)?; + Ok(()) + } + + /// Start the swarm listening for incoming connections and drive the events forward. + pub async fn run(mut self) -> anyhow::Result<()> { + // Start the swarm. + info!("running service on {}", self.listen_addr); + Swarm::listen_on(&mut self.swarm, self.listen_addr.clone())?; + + loop { + select! { + swarm_event = self.swarm.next() => match swarm_event { + // Events raised by our behaviours. + Some(SwarmEvent::Behaviour(event)) => { + self.handle_behaviour_event(event) + }, + // Connection events are handled by the behaviours, passed directly from the Swarm. + Some(_) => { }, + // The connection is closed. + None => { break; }, + }, + request = self.request_rx.recv() => match request { + // A Client sent us a request. + Some(req) => self.handle_request(req), + // This shouldn't happen because the service has a copy of the sender. + // All Client instances have been dropped. + None => { break; } + } + }; + } + Ok(()) + } + + /// Handle events that the [`NetworkBehaviour`] for our [`Behaviour`] macro generated, one for each field. + fn handle_behaviour_event(&mut self, event: BehaviourEvent

) { + match event { + BehaviourEvent::Ping(e) => self.handle_ping_event(e), + BehaviourEvent::Identify(e) => self.handle_identify_event(e), + BehaviourEvent::Discovery(e) => self.handle_discovery_event(e), + BehaviourEvent::Membership(e) => self.handle_membership_event(e), + BehaviourEvent::Content(e) => self.handle_content_event(e), + } + } + + // Copied from Forest. + fn handle_ping_event(&mut self, event: ping::Event) { + let peer_id = event.peer.to_base58(); + match event.result { + Ok(ping::Success::Ping { rtt }) => { + stats::PING_SUCCESS.inc(); + stats::PING_RTT.observe(rtt.as_millis() as f64); + trace!( + "PingSuccess::Ping rtt to {} from {} is {} ms", + peer_id, + self.peer_id, + rtt.as_millis() + ); + } + Ok(ping::Success::Pong) => { + trace!("PingSuccess::Pong from {peer_id} to {}", self.peer_id); + } + Err(ping::Failure::Timeout) => { + stats::PING_TIMEOUT.inc(); + debug!("PingFailure::Timeout from {peer_id} to {}", self.peer_id); + } + Err(ping::Failure::Other { error }) => { + stats::PING_FAILURE.inc(); + warn!( + "PingFailure::Other from {peer_id} to {}: {error}", + self.peer_id + ); + } + Err(ping::Failure::Unsupported) => { + warn!("Banning peer {peer_id} due to protocol error"); + self.swarm.ban_peer_id(event.peer); + } + } + } + + fn handle_identify_event(&mut self, event: identify::Event) { + if let identify::Event::Error { peer_id, error } = event { + stats::IDENTIFY_FAILURE.inc(); + warn!("Error identifying {peer_id}: {error}") + } else if let identify::Event::Received { peer_id, info } = event { + stats::IDENTIFY_RECEIVED.inc(); + debug!("protocols supported by {peer_id}: {:?}", info.protocols); + debug!("adding identified address of {peer_id} to {}", self.peer_id); + self.discovery_mut().add_identified(&peer_id, info); + } + } + + fn handle_discovery_event(&mut self, event: discovery::Event) { + match event { + discovery::Event::Added(peer_id, _) => { + debug!("adding routable peer {peer_id} to {}", self.peer_id); + self.membership_mut().set_routable(peer_id) + } + discovery::Event::Removed(peer_id) => { + debug!("removing unroutable peer {peer_id} from {}", self.peer_id); + self.membership_mut().set_unroutable(peer_id) + } + } + } + + fn handle_membership_event(&mut self, event: membership::Event) { + match event { + membership::Event::Skipped(peer_id) => { + debug!("skipped adding provider {peer_id} to {}", self.peer_id); + // Don't repeatedly look up peers we can't add to the routing table. + if self.background_lookup_filter.insert(&peer_id) { + debug!( + "triggering background lookup of {peer_id} on {}", + self.peer_id + ); + self.discovery_mut().background_lookup(peer_id) + } + } + membership::Event::Updated(_, _) => {} + membership::Event::Removed(_) => {} + membership::Event::ReceivedVote(vote) => { + let event = Event::ReceivedVote(vote); + if self.event_tx.send(event).is_err() { + debug!("dropped received vote because there are no subscribers") + } + } + membership::Event::ReceivedPreemptive(subnet_id, data) => { + let event = Event::ReceivedPreemptive(subnet_id, data); + if self.event_tx.send(event).is_err() { + debug!("dropped received preemptive data because there are no subscribers") + } + } + } + } + + /// Handle Bitswap lookup result. + fn handle_content_event(&mut self, event: content::Event) { + match event { + content::Event::Complete(query_id, result) => { + if let Some(query) = self.queries.remove(&query_id) { + self.resolve_query(query, result); + } else { + warn!("query ID not found"); + } + } + content::Event::BitswapForward { + peer_id, + response_rx, + response_tx, + } => { + let request_tx = self.request_tx.clone(); + tokio::task::spawn(async move { + if let Ok(res) = response_rx.await { + if let BitswapResponse::Block(bz) = &res { + let _ = request_tx.send(Request::RateLimitUsed(peer_id, bz.len())); + } + // Forward, if the listener is still open. + let _ = response_tx.send(res); + } + }); + } + } + } + + /// Handle an internal request coming from a [`Client`]. + fn handle_request(&mut self, request: Request) { + match request { + Request::SetProvidedSubnets(ids) => { + if let Err(e) = self.membership_mut().set_provided_subnets(ids) { + warn!("failed to publish set provided subnets: {e}") + } + } + Request::AddProvidedSubnet(id) => { + if let Err(e) = self.membership_mut().add_provided_subnet(id) { + warn!("failed to publish added provided subnet: {e}") + } + } + Request::RemoveProvidedSubnet(id) => { + if let Err(e) = self.membership_mut().remove_provided_subnet(id) { + warn!("failed to publish removed provided subnet: {e}") + } + } + Request::PublishVote(vote) => { + if let Err(e) = self.membership_mut().publish_vote(*vote) { + warn!("failed to publish vote: {e}") + } + } + Request::PublishPreemptive(subnet_id, data) => { + if let Err(e) = self.membership_mut().publish_preemptive(subnet_id, data) { + warn!("failed to publish pre-emptive data: {e}") + } + } + Request::PinSubnet(id) => { + if let Err(e) = self.membership_mut().pin_subnet(id) { + warn!("error pinning subnet: {e}") + } + } + Request::UnpinSubnet(id) => { + if let Err(e) = self.membership_mut().unpin_subnet(&id) { + warn!("error unpinning subnet: {e}") + } + } + Request::Resolve(cid, subnet_id, response_channel) => { + self.start_query(cid, subnet_id, response_channel) + } + Request::RateLimitUsed(peer_id, bytes) => { + self.content_mut().rate_limit_used(peer_id, bytes) + } + Request::UpdateRateLimit(bytes) => self.content_mut().update_rate_limit(bytes), + } + } + + /// Start a CID resolution. + fn start_query(&mut self, cid: Cid, subnet_id: SubnetID, response_channel: ResponseChannel) { + let mut peers = self.membership_mut().providers_of_subnet(&subnet_id); + + stats::CONTENT_RESOLVE_PEERS.observe(peers.len() as f64); + + if peers.is_empty() { + stats::CONTENT_RESOLVE_NO_PEERS.inc(); + send_resolve_result(response_channel, Err(anyhow!(NoKnownPeers(subnet_id)))); + } else { + // Connect to them in a random order, so as not to overwhelm any specific peer. + peers.shuffle(&mut rand::thread_rng()); + + // Prioritize peers we already have an established connection with. + let (connected, known) = peers + .into_iter() + .partition::, _>(|id| self.swarm.is_connected(id)); + + stats::CONTENT_CONNECTED_PEERS.observe(connected.len() as f64); + + let peers = [connected, known].into_iter().flatten().collect(); + let (peers, fallback) = self.split_peers_for_query(peers); + + let query = Query { + cid, + subnet_id, + response_channel, + fallback_peer_ids: fallback, + }; + + let query_id = self.content_mut().resolve(cid, peers); + + self.queries.insert(query_id, query); + } + } + + /// Handle the results from a resolve attempt. If it succeeded, notify the + /// listener. Otherwise if we have fallback peers to try, start another + /// query and send the result to them. By default these are the peers + /// we know support the subnet, but weren't connected to when the we + /// first attempted the resolution. + fn resolve_query(&mut self, mut query: Query, result: ResolveResult) { + match result { + Ok(_) => { + stats::CONTENT_RESOLVE_SUCCESS.inc(); + send_resolve_result(query.response_channel, result) + } + Err(_) if query.fallback_peer_ids.is_empty() => { + stats::CONTENT_RESOLVE_FAILURE.inc(); + send_resolve_result(query.response_channel, result) + } + Err(e) => { + stats::CONTENT_RESOLVE_FALLBACK.inc(); + debug!( + "resolving {} from {} failed with {}, but there are {} fallback peers to try", + query.cid, + query.subnet_id, + e, + query.fallback_peer_ids.len() + ); + + // Try to resolve from the next batch of peers. + let peers = std::mem::take(&mut query.fallback_peer_ids); + let (peers, fallback) = self.split_peers_for_query(peers); + let query_id = self.content_mut().resolve(query.cid, peers); + + // Leave the rest for later. + query.fallback_peer_ids = fallback; + + self.queries.insert(query_id, query); + } + } + } + + /// Split peers into a group we query now and a group we fall back on if the current batch fails. + fn split_peers_for_query(&self, mut peers: Vec) -> (Vec, Vec) { + let size = std::cmp::min(self.max_peers_per_query, peers.len()); + let fallback = peers.split_off(size); + (peers, fallback) + } + + // The following are helper functions because Rust Analyzer has trouble with recognising that `swarm.behaviour_mut()` is a legal call. + + fn discovery_mut(&mut self) -> &mut behaviour::discovery::Behaviour { + self.swarm.behaviour_mut().discovery_mut() + } + fn membership_mut(&mut self) -> &mut behaviour::membership::Behaviour { + self.swarm.behaviour_mut().membership_mut() + } + fn content_mut(&mut self) -> &mut behaviour::content::Behaviour

{ + self.swarm.behaviour_mut().content_mut() + } +} + +/// Respond to the sender of the query, if they are still listening. +fn send_resolve_result(tx: Sender, res: ResolveResult) { + if tx.send(res).is_err() { + error!("error sending resolve result; listener closed") + } +} + +/// Builds the transport stack that libp2p will communicate over. +/// +/// Based on the equivalent in Forest. +pub fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox)> { + let tcp_transport = + || libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new().nodelay(true)); + let transport = libp2p::dns::TokioDnsConfig::system(tcp_transport()).unwrap(); + let auth_config = { + let dh_keys = noise::Keypair::::new() + .into_authentic(&local_key) + .expect("Noise key generation failed"); + + noise::NoiseConfig::xx(dh_keys).into_authenticated() + }; + + let mplex_config = { + let mut mplex_config = mplex::MplexConfig::new(); + mplex_config.set_max_buffer_size(usize::MAX); + + let mut yamux_config = yamux::YamuxConfig::default(); + yamux_config.set_max_buffer_size(16 * 1024 * 1024); + yamux_config.set_receive_window_size(16 * 1024 * 1024); + // yamux_config.set_window_update_mode(WindowUpdateMode::OnRead); + libp2p::core::upgrade::SelectUpgrade::new(yamux_config, mplex_config) + }; + + transport + .upgrade(libp2p::core::upgrade::Version::V1) + .authenticate(auth_config) + .multiplex(mplex_config) + .timeout(Duration::from_secs(20)) + .boxed() +} diff --git a/ipld-resolver/src/signed_record.rs b/ipld-resolver/src/signed_record.rs new file mode 100644 index 000000000..a6afea078 --- /dev/null +++ b/ipld-resolver/src/signed_record.rs @@ -0,0 +1,120 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT + +use libp2p::core::signed_envelope; +use libp2p::identity::PublicKey; +use libp2p::{core::SignedEnvelope, identity::Keypair}; +use serde::de::DeserializeOwned; +use serde::Serialize; + +const DOMAIN_SEP: &str = "/ipc/ipld/resolver"; + +pub trait Record { + /// Payload type for the [`SignedEnvelope`]. + fn payload_type() -> &'static str; + /// Check that the [`PublicKey`] recovered from the [`SignedEnvelope`] + /// is consistent with the payload. + fn check_signing_key(&self, key: &PublicKey) -> bool; +} + +/// A [`ProviderRecord`] with a [`SignedEnvelope`] proving that the +/// peer indeed is ready to provide the data for the listed subnets. +#[derive(Debug, Clone)] +pub struct SignedRecord { + /// The deserialized and validated record. + record: R, + /// The [`SignedEnvelope`] from which the record was deserialized from. + envelope: SignedEnvelope, +} + +// Based on `libp2p_core::peer_record::PeerRecord` +impl SignedRecord +where + R: Record + Serialize + DeserializeOwned, +{ + /// Create a new [`SignedRecord`] with a signed envelope + /// which can be shared with others. + pub fn new(key: &Keypair, record: R) -> anyhow::Result { + let payload = fvm_ipld_encoding::to_vec(&record)?; + let envelope = SignedEnvelope::new( + key, + DOMAIN_SEP.to_owned(), + R::payload_type().as_bytes().to_vec(), + payload, + )?; + Ok(Self { record, envelope }) + } + + pub fn from_signed_envelope(envelope: SignedEnvelope) -> Result { + let (payload, signing_key) = envelope + .payload_and_signing_key(DOMAIN_SEP.to_owned(), R::payload_type().as_bytes())?; + + let record = fvm_ipld_encoding::from_slice::(payload)?; + + if !record.check_signing_key(signing_key) { + return Err(FromEnvelopeError::MismatchedSignature); + } + + Ok(Self { record, envelope }) + } + + /// Deserialize then check the domain tags and the signature. + pub fn from_bytes(bytes: &[u8]) -> anyhow::Result { + let envelope = SignedEnvelope::from_protobuf_encoding(bytes)?; + let signed_record = Self::from_signed_envelope(envelope)?; + Ok(signed_record) + } + + pub fn record(&self) -> &R { + &self.record + } + + pub fn envelope(&self) -> &SignedEnvelope { + &self.envelope + } + + pub fn into_record(self) -> R { + self.record + } + + pub fn into_envelope(self) -> SignedEnvelope { + self.envelope + } +} + +#[derive(thiserror::Error, Debug)] +pub enum FromEnvelopeError { + /// Failed to extract the payload from the envelope. + #[error("Failed to extract payload from envelope")] + BadPayload(#[from] signed_envelope::ReadPayloadError), + /// Failed to decode the provided bytes as the record type. + #[error("Failed to decode bytes as record")] + InvalidRecord(#[from] fvm_ipld_encoding::Error), + /// The signer of the envelope is different than the peer id in the record. + #[error("The signer of the envelope is different than the peer id in the record")] + MismatchedSignature, +} + +#[cfg(test)] +pub mod tests { + use fvm_ipld_encoding::de::DeserializeOwned; + use libp2p::core::SignedEnvelope; + use serde::Serialize; + + use super::{Record, SignedRecord}; + + pub fn prop_roundtrip(signed_record: SignedRecord) -> bool + where + R: Serialize + DeserializeOwned + Record + PartialEq, + { + let envelope_bytes = signed_record.envelope().clone().into_protobuf_encoding(); + + let envelope = + SignedEnvelope::from_protobuf_encoding(&envelope_bytes).expect("envelope roundtrip"); + + let signed_record2 = + SignedRecord::::from_signed_envelope(envelope).expect("record roundtrip"); + + signed_record2.into_record() == *signed_record.record() + } +} diff --git a/ipld-resolver/src/stats.rs b/ipld-resolver/src/stats.rs new file mode 100644 index 000000000..0a1493ca6 --- /dev/null +++ b/ipld-resolver/src/stats.rs @@ -0,0 +1,115 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +use lazy_static::lazy_static; +use prometheus::{Histogram, HistogramOpts, IntCounter, IntGauge, Registry}; + +macro_rules! metrics { + ($($name:ident : $type:ty = $make:expr);* $(;)?) => { + $( + lazy_static! { + pub static ref $name: $type = $make.unwrap(); + } + )* + + pub fn register_metrics(registry: &Registry) -> anyhow::Result<()> { + $(registry.register(Box::new($name.clone()))?;)* + Ok(()) + } + }; +} + +metrics! { + PING_RTT: Histogram = + Histogram::with_opts(HistogramOpts::new("ping_rtt", "Ping roundtrip time")); + + PING_TIMEOUT: IntCounter = + IntCounter::new("ping_timeouts", "Number of timed out pings"); + + PING_FAILURE: IntCounter = + IntCounter::new("ping_failure", "Number of failed pings"); + + PING_SUCCESS: IntCounter = + IntCounter::new("ping_success", "Number of successful pings",); + + IDENTIFY_FAILURE: IntCounter = + IntCounter::new("identify_failure", "Number of Identify errors",); + + IDENTIFY_RECEIVED: IntCounter = + IntCounter::new("identify_received", "Number of Identify infos received",); + + DISCOVERY_BACKGROUND_LOOKUP: IntCounter = IntCounter::new( + "discovery_background_lookup", + "Number of background lookups started", + ); + + DISCOVERY_CONNECTED_PEERS: IntGauge = + IntGauge::new("discovery_connected_peers", "Number of connections",); + + MEMBERSHIP_SKIPPED_PEERS: IntCounter = + IntCounter::new("membership_skipped_peers", "Number of providers skipped",); + + MEMBERSHIP_ROUTABLE_PEERS: IntGauge = + IntGauge::new("membership_routable_peers", "Number of routable peers"); + + MEMBERSHIP_PROVIDER_PEERS: IntGauge = + IntGauge::new("membership_provider_peers", "Number of unique providers"); + + MEMBERSHIP_UNKNOWN_TOPIC: IntCounter = IntCounter::new( + "membership_unknown_topic", + "Number of messages with unknown topic" + ); + + MEMBERSHIP_INVALID_MESSAGE: IntCounter = IntCounter::new( + "membership_invalid_message", + "Number of invalid messages received" + ); + + MEMBERSHIP_PUBLISH_SUCCESS: IntCounter = IntCounter::new( + "membership_publish_total", "Number of published messages" + ); + + MEMBERSHIP_PUBLISH_FAILURE: IntCounter = IntCounter::new( + "membership_publish_failure", + "Number of failed publish attempts" + ); + + CONTENT_RESOLVE_RUNNING: IntGauge = IntGauge::new( + "content_resolve_running", + "Number of currently running content resolutions" + ); + + CONTENT_RESOLVE_NO_PEERS: IntCounter = IntCounter::new( + "content_resolve_no_peers", + "Number of resolutions with no known peers" + ); + + CONTENT_RESOLVE_SUCCESS: IntCounter = IntCounter::new( + "content_resolve_success", + "Number of successful resolutions" + ); + + CONTENT_RESOLVE_FAILURE: IntCounter = IntCounter::new( + "content_resolve_success", + "Number of failed resolutions" + ); + + CONTENT_RESOLVE_FALLBACK: IntCounter = IntCounter::new( + "content_resolve_fallback", + "Number of resolutions that fall back on secondary peers" + ); + + CONTENT_RESOLVE_PEERS: Histogram = Histogram::with_opts(HistogramOpts::new( + "content_resolve_peers", + "Number of peers found for resolution from a subnet" + )); + + CONTENT_CONNECTED_PEERS: Histogram = Histogram::with_opts(HistogramOpts::new( + "content_connected_peers", + "Number of connected peers in a resolution" + )); + + CONTENT_RATE_LIMITED: IntCounter = IntCounter::new( + "content_rate_limited", + "Number of rate limited requests" + ); +} diff --git a/ipld-resolver/src/timestamp.rs b/ipld-resolver/src/timestamp.rs new file mode 100644 index 000000000..d531a6253 --- /dev/null +++ b/ipld-resolver/src/timestamp.rs @@ -0,0 +1,53 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +use serde::{Deserialize, Serialize}; +use std::ops::{Add, Sub}; +use std::time::{Duration, SystemTime}; + +/// Unix timestamp in seconds since epoch, which we can use to select the +/// more recent message during gossiping. +#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Debug, Serialize, Deserialize, Default)] +pub struct Timestamp(u64); + +impl Timestamp { + /// Current timestamp. + pub fn now() -> Self { + let secs = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("now() is never before UNIX_EPOCH") + .as_secs(); + Self(secs) + } + + /// Seconds elapsed since Unix epoch. + pub fn as_secs(&self) -> u64 { + self.0 + } +} + +impl Sub for Timestamp { + type Output = Self; + + fn sub(self, rhs: Duration) -> Self { + Self(self.as_secs().saturating_sub(rhs.as_secs())) + } +} + +impl Add for Timestamp { + type Output = Self; + + fn add(self, rhs: Duration) -> Self { + Self(self.as_secs().saturating_add(rhs.as_secs())) + } +} + +#[cfg(any(test, feature = "arb"))] +mod arb { + use super::Timestamp; + + impl quickcheck::Arbitrary for Timestamp { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + Self(u64::arbitrary(g).saturating_add(1)) + } + } +} diff --git a/ipld-resolver/src/vote_record.rs b/ipld-resolver/src/vote_record.rs new file mode 100644 index 000000000..6302f8398 --- /dev/null +++ b/ipld-resolver/src/vote_record.rs @@ -0,0 +1,124 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +use ipc_sdk::subnet_id::SubnetID; +use libipld::Cid; +use libp2p::identity::{Keypair, PublicKey}; +use serde::de::Error; +use serde::{Deserialize, Serialize}; + +use crate::{ + signed_record::{Record, SignedRecord}, + Timestamp, +}; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct ValidatorKey(PublicKey); + +impl Serialize for ValidatorKey { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let bz = self.0.to_protobuf_encoding(); + bz.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for ValidatorKey { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let bz = Vec::::deserialize(deserializer)?; + match PublicKey::from_protobuf_encoding(&bz) { + Ok(pk) => Ok(Self(pk)), + Err(e) => Err(D::Error::custom(format!("error decoding PublicKey: {e}"))), + } + } +} + +/// Vote by a validator about the validity/availability/finality +/// of a CID in a given subnet. +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] +pub struct VoteRecord { + /// Public key of the validator. + pub public_key: ValidatorKey, + /// The subnet in which the vote is valid, to prevent a vote on the same CID + /// in one subnet being replayed by an attacker on a different subnet. + pub subnet_id: SubnetID, + /// The CID of the content the vote is about. + pub cid: Cid, + /// The claim of the vote, in case there can be votes about multiple facets + /// regarding the CID. + pub claim: String, + /// Timestamp to thwart potential replay attacks. + pub timestamp: Timestamp, +} + +impl Record for VoteRecord { + fn payload_type() -> &'static str { + "/ipc/vote-record" + } + + fn check_signing_key(&self, key: &libp2p::identity::PublicKey) -> bool { + self.public_key.0 == *key + } +} + +pub type SignedVoteRecord = SignedRecord; + +impl VoteRecord { + /// Create a new [`SignedVoteRecord`] with the current timestamp + /// and a signed envelope which can be shared with others. + pub fn signed( + key: &Keypair, + subnet_id: SubnetID, + cid: Cid, + claim: String, + ) -> anyhow::Result { + let timestamp = Timestamp::now(); + let record = VoteRecord { + public_key: ValidatorKey(key.public()), + subnet_id, + cid, + claim, + timestamp, + }; + let signed = SignedRecord::new(key, record)?; + Ok(signed) + } +} + +#[cfg(any(test, feature = "arb"))] +mod arb { + use libp2p::identity::Keypair; + use quickcheck::Arbitrary; + + use crate::arb::{ArbCid, ArbSubnetID}; + + use super::{SignedVoteRecord, VoteRecord}; + + /// Create a valid [`SignedVoteRecord`] with a random key. + impl Arbitrary for SignedVoteRecord { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + let key = Keypair::generate_secp256k1(); + let subnet_id = ArbSubnetID::arbitrary(g).0; + let cid = ArbCid::arbitrary(g).0; + let claim = String::arbitrary(g); + + VoteRecord::signed(&key, subnet_id, cid, claim).expect("error creating signed envelope") + } + } +} + +#[cfg(test)] +mod tests { + use quickcheck_macros::quickcheck; + + use super::SignedVoteRecord; + + #[quickcheck] + fn prop_roundtrip(signed_record: SignedVoteRecord) -> bool { + crate::signed_record::tests::prop_roundtrip(signed_record) + } +} diff --git a/ipld-resolver/tests/smoke.rs b/ipld-resolver/tests/smoke.rs new file mode 100644 index 000000000..b38684397 --- /dev/null +++ b/ipld-resolver/tests/smoke.rs @@ -0,0 +1,397 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +//! Test that a cluster of IPLD resolver can be started in memory, +//! that they bootstrap from each other and are able to resolve CIDs. +//! +//! Run the tests as follows: +//! ```ignore +//! RUST_LOG=debug cargo test -p ipc_ipld_resolver --test smoke resolve +//! ``` + +// For inspiration on testing libp2p look at: +// * https://github.com/libp2p/rust-libp2p/blob/v0.50.0/misc/multistream-select/tests/transport.rs +// * https://github.com/libp2p/rust-libp2p/blob/v0.50.0/protocols/ping/tests/ping.rs +// * https://github.com/libp2p/rust-libp2p/blob/v0.50.0/protocols/gossipsub/tests/smoke.rs +// They all use a different combination of `MemoryTransport` and executors. +// These tests attempt to use `MemoryTransport` so it's quicker, with `Swarm::with_tokio_executor` +// so we can leave the polling to the `Service` running in a `Task`, rather than do it from the test +// (although these might be orthogonal). + +use std::{ + sync::atomic::{AtomicU64, Ordering}, + time::Duration, +}; + +use anyhow::anyhow; +use fvm_ipld_encoding::IPLD_RAW; +use fvm_ipld_hamt::Hamt; +use fvm_shared::{address::Address, ActorID}; +use ipc_ipld_resolver::{ + Client, Config, ConnectionConfig, ContentConfig, DiscoveryConfig, Event, MembershipConfig, + NetworkConfig, Service, VoteRecord, +}; +use ipc_sdk::subnet_id::SubnetID; +use libipld::{ + multihash::{Code, MultihashDigest}, + Cid, +}; +use libp2p::{ + core::{ + muxing::StreamMuxerBox, + transport::{Boxed, MemoryTransport}, + }, + identity::Keypair, + mplex, + multiaddr::Protocol, + plaintext::PlainText2Config, + yamux, Multiaddr, PeerId, Transport, +}; +use rand::{rngs::StdRng, Rng, SeedableRng}; + +mod store; +use store::*; +use tokio::{sync::broadcast, time::timeout}; + +struct Agent { + config: Config, + client: Client, + events: broadcast::Receiver, + store: TestBlockstore, +} + +struct Cluster { + agents: Vec, +} + +impl Cluster { + pub fn size(&self) -> usize { + self.agents.len() + } +} + +struct ClusterBuilder { + size: u32, + rng: StdRng, + services: Vec>, + agents: Vec, +} + +impl ClusterBuilder { + fn new(size: u32) -> Self { + // Each port has to be unique, so each test must use a different seed. + // This is shared between all instances. + static COUNTER: AtomicU64 = AtomicU64::new(0); + let seed = COUNTER.fetch_add(1, Ordering::Relaxed); + Self::new_with_seed(size, seed) + } + + fn new_with_seed(size: u32, seed: u64) -> Self { + Self { + size, + rng: rand::rngs::StdRng::seed_from_u64(seed), + services: Default::default(), + agents: Default::default(), + } + } + + /// Add a node with randomized address, optionally bootstrapping from an existing node. + fn add_node(&mut self, bootstrap: Option) { + let bootstrap_addr = bootstrap.map(|i| { + let config = &self.agents[i].config; + let peer_id = config.network.local_peer_id(); + let mut addr = config.connection.listen_addr.clone(); + addr.push(Protocol::P2p(peer_id.into())); + addr + }); + let config = make_config(&mut self.rng, self.size, bootstrap_addr); + let (service, store) = make_service(config.clone()); + let client = service.client(); + let events = service.subscribe(); + self.services.push(service); + self.agents.push(Agent { + config, + client, + events, + store, + }); + } + + /// Start running all services + fn run(self) -> Cluster { + for service in self.services { + tokio::task::spawn(async move { service.run().await.expect("error running service") }); + } + Cluster { + agents: self.agents, + } + } +} + +/// Start a cluster of agents from a single bootstrap node, +/// make available some content on one agent and resolve it from another. +#[tokio::test] +async fn single_bootstrap_single_provider_resolve_one() { + let _ = env_logger::builder().is_test(true).try_init(); + //env_logger::init(); + + // Choose agents. + let cluster_size = 3; + let bootstrap_idx = 0; + let provider_idx = 1; + let resolver_idx = 2; + + // TODO: Get the seed from QuickCheck + let mut builder = ClusterBuilder::new(cluster_size); + + // Build a cluster of nodes. + for i in 0..builder.size { + builder.add_node(if i == 0 { None } else { Some(bootstrap_idx) }); + } + + // Start the swarms. + let mut cluster = builder.run(); + + // Insert a CID of a complex recursive data structure. + let cid = insert_test_data(&mut cluster.agents[provider_idx]).expect("failed to insert data"); + + // Sanity check that we can read the data back. + check_test_data(&mut cluster.agents[provider_idx], &cid).expect("failed to read back the data"); + + // Wait a little for the cluster to connect. + // TODO: Wait on some condition instead of sleep. + tokio::time::sleep(Duration::from_secs(1)).await; + + // Announce the support of some subnet. + let subnet_id = make_subnet_id(1001); + + cluster.agents[provider_idx] + .client + .add_provided_subnet(subnet_id.clone()) + .expect("failed to add provided subnet"); + + // Wait a little for the gossip to spread and peer lookups to happen, then another round of gossip. + // TODO: Wait on some condition instead of sleep. + tokio::time::sleep(Duration::from_secs(2)).await; + + // Ask for the CID to be resolved from by another peer. + cluster.agents[resolver_idx] + .client + .resolve(cid, subnet_id.clone()) + .await + .expect("failed to send request") + .expect("failed to resolve content"); + + // Check that the CID is deposited into the store of the requestor. + check_test_data(&mut cluster.agents[resolver_idx], &cid).expect("failed to resolve from store"); +} + +/// Start two agents, subscribe to the same subnet, publish and receive a vote. +#[tokio::test] +async fn single_bootstrap_publish_receive_vote() { + let _ = env_logger::builder().is_test(true).try_init(); + //env_logger::init(); + + // TODO: Get the seed from QuickCheck + let mut builder = ClusterBuilder::new(2); + + // Build a cluster of nodes. + for i in 0..builder.size { + builder.add_node(if i == 0 { None } else { Some(0) }); + } + + // Start the swarms. + let mut cluster = builder.run(); + + // Wait a little for the cluster to connect. + // TODO: Wait on some condition instead of sleep. + tokio::time::sleep(Duration::from_secs(1)).await; + + // Announce the support of some subnet. + let subnet_id = make_subnet_id(1001); + + for i in 0..cluster.size() { + cluster.agents[i] + .client + .add_provided_subnet(subnet_id.clone()) + .expect("failed to add provided subnet"); + } + + // Wait a little for the gossip to spread and peer lookups to happen, then another round of gossip. + // TODO: Wait on some condition instead of sleep. + tokio::time::sleep(Duration::from_secs(2)).await; + + // Vote on some random CID. + let validator_key = Keypair::generate_secp256k1(); + let cid = Cid::new_v1(IPLD_RAW, Code::Sha2_256.digest(b"foo")); + let vote = VoteRecord::signed(&validator_key, subnet_id, cid, "finalized".into()) + .expect("failed to sign vote"); + + // Pubilish vote + cluster.agents[0] + .client + .publish_vote(vote.clone()) + .expect("failed to send vote"); + + // Receive vote. + let event = timeout(Duration::from_secs(2), cluster.agents[1].events.recv()) + .await + .expect("timeout receiving vote") + .expect("error receiving vote"); + + if let Event::ReceivedVote(v) = event { + assert_eq!(&*v, vote.record()); + } else { + panic!("unexpected {event:?}") + } +} + +/// Start two agents, pin a subnet, publish preemptively and receive. +#[tokio::test] +async fn single_bootstrap_publish_receive_preemptive() { + let _ = env_logger::builder().is_test(true).try_init(); + + // TODO: Get the seed from QuickCheck + let mut builder = ClusterBuilder::new(2); + + // Build a cluster of nodes. + for i in 0..builder.size { + builder.add_node(if i == 0 { None } else { Some(0) }); + } + + // Start the swarms. + let mut cluster = builder.run(); + + // Wait a little for the cluster to connect. + // TODO: Wait on some condition instead of sleep. + tokio::time::sleep(Duration::from_secs(1)).await; + + // Pin a subnet on the bootstrap node. + let subnet_id = make_subnet_id(1001); + + cluster.agents[0] + .client + .pin_subnet(subnet_id.clone()) + .expect("failed to pin subnet"); + + // TODO: Wait on some condition instead of sleep. + tokio::time::sleep(Duration::from_secs(1)).await; + + // Publish some content from the other agent. + let data = vec![1, 2, 3]; + cluster.agents[1] + .client + .publish_preemptive(subnet_id.clone(), data.clone()) + .expect("failed to send vote"); + + // Receive pre-emptive data.. + let event = timeout(Duration::from_secs(2), cluster.agents[0].events.recv()) + .await + .expect("timeout receiving data") + .expect("error receiving data"); + + if let Event::ReceivedPreemptive(s, d) = event { + assert_eq!(s, subnet_id); + assert_eq!(d, data); + } else { + panic!("unexpected {event:?}") + } +} + +fn make_service(config: Config) -> (Service, TestBlockstore) { + let store = TestBlockstore::default(); + let svc = Service::new_with_transport(config, store.clone(), build_transport).unwrap(); + (svc, store) +} + +fn make_config(rng: &mut StdRng, cluster_size: u32, bootstrap_addr: Option) -> Config { + let config = Config { + connection: ConnectionConfig { + listen_addr: Multiaddr::from(Protocol::Memory(rng.gen::())), + expected_peer_count: cluster_size, + max_incoming: cluster_size, + max_peers_per_query: cluster_size, + event_buffer_capacity: cluster_size, + }, + network: NetworkConfig { + local_key: Keypair::generate_secp256k1(), + network_name: "smoke-test".to_owned(), + }, + discovery: DiscoveryConfig { + static_addresses: bootstrap_addr.iter().cloned().collect(), + target_connections: cluster_size.try_into().unwrap(), + enable_kademlia: true, + }, + membership: MembershipConfig { + static_subnets: vec![], + max_subnets: 10, + publish_interval: Duration::from_secs(5), + min_time_between_publish: Duration::from_secs(1), + max_provider_age: Duration::from_secs(60), + }, + content: ContentConfig { + rate_limit_bytes: 1 << 20, + rate_limit_period: Duration::from_secs(60), + }, + }; + + config +} + +/// Builds an in-memory transport for libp2p to communicate over. +fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox)> { + let auth_config = PlainText2Config { + local_public_key: local_key.public(), + }; + + let mplex_config = { + let mut mplex_config = mplex::MplexConfig::new(); + mplex_config.set_max_buffer_size(usize::MAX); + + let mut yamux_config = yamux::YamuxConfig::default(); + yamux_config.set_max_buffer_size(16 * 1024 * 1024); + yamux_config.set_receive_window_size(16 * 1024 * 1024); + // yamux_config.set_window_update_mode(WindowUpdateMode::OnRead); + libp2p::core::upgrade::SelectUpgrade::new(yamux_config, mplex_config) + }; + + MemoryTransport::default() + .upgrade(libp2p::core::upgrade::Version::V1) + .authenticate(auth_config) + .multiplex(mplex_config) + .boxed() +} + +/// Make a subnet under a rootnet. +fn make_subnet_id(actor_id: ActorID) -> SubnetID { + let act = Address::new_id(actor_id); + let root = SubnetID::new_root(0); + SubnetID::new_from_parent(&root, act) +} + +/// Insert a HAMT into the block store of an agent. +fn insert_test_data(agent: &mut Agent) -> anyhow::Result { + let mut hamt: Hamt<_, String, u32> = Hamt::new(&agent.store); + + // Insert enough data into the HAMT to make sure it grows from a single `Node`. + for i in 0..1000 { + hamt.set(i, format!("value {i}"))?; + } + let cid = hamt.flush()?; + + Ok(cid) +} + +fn check_test_data(agent: &mut Agent, cid: &Cid) -> anyhow::Result<()> { + let hamt: Hamt<_, String, u32> = Hamt::load(cid, &agent.store)?; + + // Check all the data inserted by `insert_test_data`. + for i in 0..1000 { + match hamt.get(&i)? { + None => return Err(anyhow!("key {i} is missing")), + Some(v) if *v != format!("value {i}") => return Err(anyhow!("unexpected value: {v}")), + _ => {} + } + } + + Ok(()) +} diff --git a/ipld-resolver/tests/store/mod.rs b/ipld-resolver/tests/store/mod.rs new file mode 100644 index 000000000..414399b70 --- /dev/null +++ b/ipld-resolver/tests/store/mod.rs @@ -0,0 +1,54 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: MIT +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +use anyhow::Result; +use fvm_ipld_blockstore::Blockstore; +use ipc_ipld_resolver::missing_blocks::missing_blocks; +use libipld::Cid; +use libp2p_bitswap::BitswapStore; + +#[derive(Debug, Clone, Default)] +pub struct TestBlockstore { + blocks: Arc>>>, +} + +impl Blockstore for TestBlockstore { + fn has(&self, k: &Cid) -> Result { + Ok(self.blocks.read().unwrap().contains_key(k)) + } + + fn get(&self, k: &Cid) -> Result>> { + Ok(self.blocks.read().unwrap().get(k).cloned()) + } + + fn put_keyed(&self, k: &Cid, block: &[u8]) -> Result<()> { + self.blocks.write().unwrap().insert(*k, block.into()); + Ok(()) + } +} + +pub type TestStoreParams = libipld::DefaultParams; + +impl BitswapStore for TestBlockstore { + type Params = TestStoreParams; + + fn contains(&mut self, cid: &Cid) -> Result { + Blockstore::has(self, cid) + } + + fn get(&mut self, cid: &Cid) -> Result>> { + Blockstore::get(self, cid) + } + + fn insert(&mut self, block: &libipld::Block) -> Result<()> { + Blockstore::put_keyed(self, block.cid(), block.data()) + } + + fn missing_blocks(&mut self, cid: &Cid) -> Result> { + missing_blocks::(self, cid) + } +}