Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p)!: Extend Head interface's Head method with ...HeadOption, introduce WithTrustedHead opt #53

Merged
merged 32 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d48a0ee
feat(p2p): Extend Head request with RequestOptions and introduce With…
renaynay Jun 21, 2023
092ceb6
doc(p2p): add godoc for WithSubjectiveInit opt
renaynay Jun 21, 2023
d0da16a
tiny
renaynay Jun 21, 2023
5a4712e
fix(p2p): Only query subset of tracked peers
renaynay Jun 22, 2023
8535382
refactor(p2p): rename minTrustedHeadResponses to minHeadResponses
renaynay Jun 23, 2023
67c1aa7
rename(p2p): rename minUntrustedHeadRequests -> numUntrustedHeadRequests
renaynay Jun 30, 2023
1310469
rename(p2p): rename arg in Head
renaynay Jun 30, 2023
8b5f2d4
refactor(go-header)!: Break Head interface by introducing variadic op…
renaynay Jun 30, 2023
9e5a4dd
refactor(p2p): unexport getPeers method on tracker
renaynay Jun 30, 2023
95b10b3
refactor(sync): Add header.WithSubjectiveInit option to sync subjecti…
renaynay Jun 30, 2023
5aa8e67
prog
renaynay Jun 30, 2023
fa04d6a
refactor: Change from RequestOption to HeadOption
renaynay Jul 4, 2023
fadc926
refactor: WithSubjectiveInit --> DisableSubjectiveInit(header)
renaynay Jul 4, 2023
8e41a70
fixes after rebase
renaynay Jul 12, 2023
dcc79a2
test(sync): Add test for DisabledSubjectiveInit head req from syncer
renaynay Jul 12, 2023
d2a8324
refactor: Rename HeadRequestParams --> HeadParams
renaynay Jul 12, 2023
cbc18c5
refactor: WithDisabledSubjectiveInit --> WithTrustedHead
renaynay Jul 17, 2023
4eaaca6
doc(sync): add test doc
renaynay Jul 17, 2023
a117a86
refactor(p2p): getPeers takes amount as arg and returns []peer.ID, error
renaynay Jul 17, 2023
524e377
test(sync): Remove trusting period custom (relic from previous test a…
renaynay Jul 18, 2023
8fe3558
refactor: Address @Wondertan review
renaynay Jul 18, 2023
6d138e6
refactor(p2p): Introduce bootstrap function to peertracker and launch…
renaynay Jul 21, 2023
d3dae25
test(p2p): add test for bootstrap function
renaynay Jul 21, 2023
d4042e0
refactor(p2p): Remove pre-emptive p.connected inside connectToPeer, "…
renaynay Jul 31, 2023
d5f54af
chore(p2p): Add some debug logging
renaynay Aug 1, 2023
53c02ea
doc(p2p): document flake case
renaynay Aug 2, 2023
299731a
test(p2p): use assert eventually
renaynay Aug 2, 2023
a00e68c
refactor(p2p): make getPeers blocking
renaynay Aug 2, 2023
20879c3
fix(p2p): ticker --> notif ch
renaynay Aug 2, 2023
882dd48
refactor(p2p): bootstrap blocks on attempts to connect to bootstrappe…
renaynay Aug 2, 2023
fa3c102
refactor(p2p): addressing hlib nits
renaynay Aug 3, 2023
bb1bed9
Merge branch 'main' into subjective-init
renaynay Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions headertest/dummy_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ func (d *DummyHeader) Verify(header header.Header) error {
}

if header.Height() <= d.Height() {
return fmt.Errorf("expected new header Height to be larger than old header Time")
return fmt.Errorf("expected new header Height %d to be larger than old header Height %d", header.Height(), d.Height())
}

if header.Time().Before(d.Time()) {
return fmt.Errorf("expected new header Time to be after old header Time")
return fmt.Errorf("expected new header Time %v to be after old header Time %v", header.Time(), d.Time())
}

return nil
Expand Down
1 change: 1 addition & 0 deletions headertest/dummy_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *DummySuite) NextHeader() *DummyHeader {
}

dh := RandDummyHeader(s.t)
dh.Raw.Time = s.head.Time().Add(time.Nanosecond)
dh.Raw.Height = s.head.Height() + 1
dh.Raw.PreviousHash = s.head.Hash()
_ = dh.rehash()
Expand Down
2 changes: 1 addition & 1 deletion headertest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (m *Store[H]) Height() uint64 {
return uint64(m.HeadHeight)
}

func (m *Store[H]) Head(context.Context) (H, error) {
func (m *Store[H]) Head(context.Context, ...header.HeadOption) (H, error) {
return m.Headers[m.HeadHeight], nil
}

Expand Down
2 changes: 1 addition & 1 deletion interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,5 @@ type Getter[H Header] interface {
// reporting it.
type Head[H Header] interface {
// Head returns the latest known header.
Head(context.Context) (H, error)
Head(context.Context, ...HeadOption) (H, error)
}
2 changes: 1 addition & 1 deletion local/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (l *Exchange[H]) Stop(context.Context) error {
return nil
}

func (l *Exchange[H]) Head(ctx context.Context) (H, error) {
func (l *Exchange[H]) Head(ctx context.Context, _ ...header.HeadOption) (H, error) {
return l.store.Head(ctx)
}

Expand Down
17 changes: 17 additions & 0 deletions opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package header

type HeadOption func(opts *HeadParams)

// HeadParams contains options to be used for Head interface methods
type HeadParams struct {
// TrustedHead allows the caller of Head to specify a trusted header
// against which the underlying implementation of Head can verify against.
TrustedHead Header
}

// WithTrustedHead sets the TrustedHead parameter to the given header.
func WithTrustedHead(verified Header) func(opts *HeadParams) {
return func(opts *HeadParams) {
opts.TrustedHead = verified
}
}
86 changes: 59 additions & 27 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package p2p
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"sort"
Expand All @@ -21,10 +20,15 @@ import (

var log = logging.Logger("header/p2p")

// the minimum number of headers of the same height received from trusted peers
// to determine the network head. If all trusted header will return headers with
// non-equal height, then the highest header will be chosen.
const minTrustedHeadResponses = 2
// minHeadResponses is the minimum number of headers of the same height
// received from peers to determine the network head. If all trusted peers
// will return headers with non-equal height, then the highest header will be
// chosen.
const minHeadResponses = 2
renaynay marked this conversation as resolved.
Show resolved Hide resolved

// numUntrustedHeadRequests is the number of head requests to be made to
// the network in order to determine the network head.
var numUntrustedHeadRequests = 4
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
Wondertan marked this conversation as resolved.
Show resolved Hide resolved

// Exchange enables sending outbound HeaderRequests to the network as well as
// handling inbound HeaderRequests from the network.
Expand Down Expand Up @@ -72,23 +76,19 @@ func NewExchange[H header.Header](
return ex, nil
}

func (ex *Exchange[H]) Start(context.Context) error {
func (ex *Exchange[H]) Start(ctx context.Context) error {
ex.ctx, ex.cancel = context.WithCancel(context.Background())
log.Infow("client: starting client", "protocol ID", ex.protocolID)

trustedPeers := ex.trustedPeers()

for _, p := range trustedPeers {
// Try to pre-connect to trusted peers.
// We don't really care if we succeed at this point
// and just need any peers in the peerTracker asap
go func(p peer.ID) {
err := ex.host.Connect(ex.ctx, peer.AddrInfo{ID: p})
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
log.Debugw("err connecting to a bootstrap peer", "err", err, "peer", p)
}
}(p)
// bootstrap the peerTracker with trusted peers as well as previously seen
// peers if provided. If previously seen peers were provided, bootstrap
// method will block until the given number of connections were attempted
// (successful or not).
err := ex.peerTracker.bootstrap(ctx, ex.trustedPeers(), numUntrustedHeadRequests)
if err != nil {
return err
}

go ex.peerTracker.gc()
go ex.peerTracker.track()
return nil
Expand All @@ -106,7 +106,7 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error {
// The Head must be verified thereafter where possible.
// We request in parallel all the trusted peers, compare their response
// and return the highest one.
func (ex *Exchange[H]) Head(ctx context.Context) (H, error) {
func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption) (H, error) {
log.Debug("requesting head")

reqCtx := ctx
Expand All @@ -121,30 +121,62 @@ func (ex *Exchange[H]) Head(ctx context.Context) (H, error) {
defer cancel()
}

reqParams := header.HeadParams{}
for _, opt := range opts {
opt(&reqParams)
}

var (
zero H
trustedPeers = ex.trustedPeers()
headerRespCh = make(chan H, len(trustedPeers))
headerReq = &p2p_pb.HeaderRequest{
peers = ex.trustedPeers()
useTrackedPeers bool
)

trackedPeers, err := ex.peerTracker.getPeers(numUntrustedHeadRequests)
useTrackedPeers = reqParams.TrustedHead != nil && err == nil
// the TrustedHead field indicates whether the Exchange should use
// trusted peers for its Head request. If nil, trusted peers will
// be used. If non-nil, Exchange will ask several peers (as long as enough
// of them exist in the peer tracker) from its network for their Head and
// verify against the given trusted header.
if useTrackedPeers {
peers = trackedPeers
}

var (
zero H
renaynay marked this conversation as resolved.
Show resolved Hide resolved
headerReq = &p2p_pb.HeaderRequest{
Data: &p2p_pb.HeaderRequest_Origin{Origin: uint64(0)},
Amount: 1,
}
headerRespCh = make(chan H, len(peers))
)
for _, from := range trustedPeers {
for _, from := range peers {
go func(from peer.ID) {
headers, err := ex.request(reqCtx, from, headerReq)
if err != nil {
log.Errorw("head request to trusted peer failed", "trustedPeer", from, "err", err)
headerRespCh <- zero
return
}
// if tracked (untrusted) peers were requested, verify head
if useTrackedPeers {
err = reqParams.TrustedHead.Verify(headers[0])
if err != nil {
log.Errorw("head request to untrusted peer failed", "untrusted peer", from,
"err", err)
// bad head was given, block peer
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
ex.peerTracker.blockPeer(from, fmt.Errorf("returned bad head: %w", err))
headerRespCh <- zero
return
}
}
// request ensures that the result slice will have at least one Header
headerRespCh <- headers[0]
}(from)
}

headers := make([]H, 0, len(trustedPeers))
for range trustedPeers {
headers := make([]H, 0, len(peers))
MSevey marked this conversation as resolved.
Show resolved Hide resolved
for range peers {
select {
case h := <-headerRespCh:
if !h.IsZero() {
Expand Down Expand Up @@ -346,7 +378,7 @@ func bestHead[H header.Header](result []H) (H, error) {

// try to find Header with the maximum height that was received at least from 2 peers
for _, res := range result {
if counter[res.Hash().String()] >= minTrustedHeadResponses {
if counter[res.Hash().String()] >= minHeadResponses {
return res, nil
}
}
Expand Down
71 changes: 63 additions & 8 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"context"
"strconv"
"testing"
"time"

Expand All @@ -18,23 +19,77 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/go-libp2p-messenger/serde"

"github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/headertest"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
"github.com/celestiaorg/go-libp2p-messenger/serde"
)

const networkID = "private"

func TestExchange_RequestHead(t *testing.T) {
hosts := createMocknet(t, 2)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])
// perform header request
header, err := exchg.Head(context.Background())
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

// decrease the minimum number of untrusted peers to request head from
// for testing purposes
numUntrustedHeadRequests = 2

hosts := createMocknet(t, 3)
exchg, trustedStore := createP2PExAndServer(t, hosts[0], hosts[1])

// create new server-side exchange that will act as the tracked peer
// it will have a higher chain head than the trusted peer so that the
// test can determine which peer was asked
trackedStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 50)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](hosts[2], trackedStore,
WithNetworkID[ServerParameters](networkID),
)
require.NoError(t, err)
err = serverSideEx.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
err = serverSideEx.Stop(ctx)
require.NoError(t, err)
})

assert.Equal(t, store.Headers[store.HeadHeight].Height(), header.Height())
assert.Equal(t, store.Headers[store.HeadHeight].Hash(), header.Hash())
tests := []struct {
requestFromTrusted bool
lastHeader header.Header
expectedHeight int64
expectedHash header.Hash
}{
// routes to trusted peer only
{
requestFromTrusted: true,
lastHeader: trustedStore.Headers[trustedStore.HeadHeight-1],
expectedHeight: trustedStore.HeadHeight,
expectedHash: trustedStore.Headers[trustedStore.HeadHeight].Hash(),
},
// routes to tracked peers and takes highest chain head
{
requestFromTrusted: false,
lastHeader: trackedStore.Headers[trackedStore.HeadHeight-1],
expectedHeight: trackedStore.HeadHeight,
expectedHash: trackedStore.Headers[trackedStore.HeadHeight].Hash(),
},
}

for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var opts []header.HeadOption
if !tt.requestFromTrusted {
opts = append(opts, header.WithTrustedHead(tt.lastHeader))
}

header, err := exchg.Head(ctx, opts...)
require.NoError(t, err)

assert.Equal(t, tt.expectedHeight, header.Height())
assert.Equal(t, tt.expectedHash, header.Hash())
})
}
}

func TestExchange_RequestHead_UnresponsivePeer(t *testing.T) {
Expand Down Expand Up @@ -532,7 +587,7 @@ func (t *timedOutStore) HasAt(_ context.Context, _ uint64) bool {
return true
}

func (t *timedOutStore) Head(_ context.Context) (*headertest.DummyHeader, error) {
func (t *timedOutStore) Head(context.Context, ...header.HeadOption) (*headertest.DummyHeader, error) {
time.Sleep(t.timeout)
return nil, header.ErrNoHead
}
Loading