Skip to content

Commit

Permalink
Merge pull request #117 from renaynay/get-range
Browse files Browse the repository at this point in the history
refactor!: Merge `GetVerifiedRange` and `GetRangeByHeight` in Getter interface and remove redundant endpoint
  • Loading branch information
renaynay authored Oct 12, 2023
2 parents df01474 + 16969fd commit 26daa91
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 126 deletions.
24 changes: 14 additions & 10 deletions headertest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,20 @@ func (m *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
return m.Headers[height], nil
}

func (m *Store[H]) GetRangeByHeight(ctx context.Context, from, to uint64) ([]H, error) {
headers := make([]H, to-from)
func (m *Store[H]) GetRange(ctx context.Context, from, to uint64) ([]H, error) {
return m.getRangeByHeight(ctx, from, to)
}

// GetRangeByHeight returns headers in range [from; to).
func (m *Store[H]) GetRangeByHeight(ctx context.Context, fromHead H, to uint64) ([]H, error) {
from := fromHead.Height() + 1
return m.getRangeByHeight(ctx, from, to)
}

func (m *Store[H]) getRangeByHeight(ctx context.Context, from, to uint64) ([]H, error) {
amount := to - from
headers := make([]H, amount)

// As the requested range is [from; to),
// check that (to-1) height in request is less than
// the biggest header height in store.
Expand All @@ -79,14 +91,6 @@ func (m *Store[H]) GetRangeByHeight(ctx context.Context, from, to uint64) ([]H,
return headers, nil
}

func (m *Store[H]) GetVerifiedRange(
ctx context.Context,
h H,
to uint64,
) ([]H, error) {
return m.GetRangeByHeight(ctx, uint64(h.Height())+1, to)
}

func (m *Store[H]) Has(context.Context, header.Hash) (bool, error) {
return false, nil
}
Expand Down
11 changes: 6 additions & 5 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type Store[H Header[H]] interface {
// It returns the amount of successfully applied headers,
// so caller can understand what given header was invalid, if any.
Append(context.Context, ...H) error

// GetRange returns the range [from:to).
GetRange(context.Context, uint64, uint64) ([]H, error)
}

// Getter contains the behavior necessary for a component to retrieve
Expand All @@ -106,12 +109,10 @@ type Getter[H Header[H]] interface {
// GetByHeight returns the Header corresponding to the given block height.
GetByHeight(context.Context, uint64) (H, error)

// GetRangeByHeight returns the given range of Headers.
GetRangeByHeight(ctx context.Context, from, amount uint64) ([]H, error)

// GetVerifiedRange requests the header range from the provided Header and
// GetRangeByHeight requests the header range from the provided Header and
// verifies that the returned headers are adjacent to each other.
GetVerifiedRange(ctx context.Context, from H, amount uint64) ([]H, error)
// Expected to return the range [from.Height()+1:to).
GetRangeByHeight(ctx context.Context, from H, to uint64) ([]H, error)
}

// Head contains the behavior necessary for a component to retrieve
Expand Down
11 changes: 2 additions & 9 deletions local/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,9 @@ func (l *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error)
return l.store.GetByHeight(ctx, height)
}

func (l *Exchange[H]) GetRangeByHeight(ctx context.Context, origin, amount uint64) ([]H, error) {
if amount == 0 {
return nil, nil
}
return l.store.GetRangeByHeight(ctx, origin, origin+amount)
}

func (l *Exchange[H]) GetVerifiedRange(ctx context.Context, from H, amount uint64,
func (l *Exchange[H]) GetRangeByHeight(ctx context.Context, from H, to uint64,
) ([]H, error) {
return l.store.GetVerifiedRange(ctx, from, uint64(from.Height())+amount+1)
return l.store.GetRangeByHeight(ctx, from, to)
}

func (l *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
Expand Down
26 changes: 5 additions & 21 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,36 +218,20 @@ func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error
return headers[0], nil
}

// GetRangeByHeight performs a request for the given range of Headers
// to the network. Note that the Headers must be verified thereafter.
func (ex *Exchange[H]) GetRangeByHeight(ctx context.Context, from, amount uint64) ([]H, error) {
if amount == 0 {
return make([]H, 0), nil
}
if amount > header.MaxRangeRequestSize {
return nil, header.ErrHeadersLimitExceeded
}
session := newSession[H](ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout)
defer session.close()
return session.getRangeByHeight(ctx, from, amount, ex.Params.MaxHeadersPerRangeRequest)
}

// GetVerifiedRange performs a request for the given range of Headers to the network and
// GetRangeByHeight performs a request for the given range of Headers to the network and
// ensures that returned headers are correct against the passed one.
func (ex *Exchange[H]) GetVerifiedRange(
func (ex *Exchange[H]) GetRangeByHeight(
ctx context.Context,
from H,
amount uint64,
to uint64,
) ([]H, error) {
if amount == 0 {
return make([]H, 0), nil
}
session := newSession[H](
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, withValidation(from),
)
defer session.close()
// we request the next header height that we don't have: `fromHead`+1
return session.getRangeByHeight(ctx, uint64(from.Height())+1, amount, ex.Params.MaxHeadersPerRangeRequest)
amount := to - (from.Height() + 1)
return session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest)
}

// Get performs a request for the Header by the given hash corresponding
Expand Down
83 changes: 50 additions & 33 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,36 +166,46 @@ func TestExchange_RequestHeader(t *testing.T) {
assert.Equal(t, store.Headers[5].Hash(), header.Hash())
}

func TestExchange_RequestHeaders(t *testing.T) {
func TestExchange_GetRangeByHeight(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

hosts := createMocknet(t, 2)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])

from, err := store.GetByHeight(ctx, 1)
require.NoError(t, err)

firstHeaderInRangeHeight := from.Height() + 1
lastHeaderInRangeHeight := uint64(4)
to := lastHeaderInRangeHeight + 1
expectedLenHeaders := to - firstHeaderInRangeHeight // expected amount

// perform expected request
gotHeaders, err := exchg.GetRangeByHeight(context.Background(), 1, 5)
gotHeaders, err := exchg.GetRangeByHeight(ctx, from, to)
require.NoError(t, err)

assert.Len(t, gotHeaders, int(expectedLenHeaders))
assert.Equal(t, firstHeaderInRangeHeight, gotHeaders[0].Height())
assert.Equal(t, lastHeaderInRangeHeight, gotHeaders[len(gotHeaders)-1].Height())

for _, got := range gotHeaders {
assert.Equal(t, store.Headers[got.Height()].Height(), got.Height())
assert.Equal(t, store.Headers[got.Height()].Hash(), got.Hash())
}
}

func TestExchange_RequestVerifiedHeaders(t *testing.T) {
func TestExchange_GetRangeByHeight_FailsVerification(t *testing.T) {
hosts := createMocknet(t, 2)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])
// perform expected request
h := store.Headers[1]
_, err := exchg.GetVerifiedRange(context.Background(), h, 3)
require.NoError(t, err)
}

func TestExchange_RequestVerifiedHeadersFails(t *testing.T) {
hosts := createMocknet(t, 2)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])
store.Headers[2] = store.Headers[3]
store.Headers[3].VerifyFailure = true // force a verification failure on the 3rd header
// perform expected request
h := store.Headers[1]
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
t.Cleanup(cancel)
_, err := exchg.GetVerifiedRange(ctx, h, 3)

// requests range (1:4)
_, err := exchg.GetRangeByHeight(ctx, h, 4)
assert.Error(t, err)

// ensure that peer was added to the blacklist
Expand All @@ -209,7 +219,7 @@ func TestExchange_RequestVerifiedHeadersFails(t *testing.T) {
func TestExchange_RequestFullRangeHeaders(t *testing.T) {
// create mocknet with 5 peers
hosts := createMocknet(t, 5)
store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), int(header.MaxRangeRequestSize))
store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), int(header.MaxRangeRequestSize)+1)
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)

Expand Down Expand Up @@ -238,28 +248,23 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)
// request headers from 1 to totalAmount(80)
headers, err := exchange.GetRangeByHeight(ctx, 1, header.MaxRangeRequestSize)

gen, err := store.GetByHeight(ctx, 1)
require.NoError(t, err)
require.Len(t, headers, int(header.MaxRangeRequestSize))
}

// TestExchange_RequestHeadersLimitExceeded tests that the Exchange instance will return
// header.ErrHeadersLimitExceeded if the requested range will be move than MaxRangeRequestSize.
func TestExchange_RequestHeadersLimitExceeded(t *testing.T) {
hosts := createMocknet(t, 2)
exchg, _ := createP2PExAndServer(t, hosts[0], hosts[1])
_, err := exchg.GetRangeByHeight(context.Background(), 1, 600)
require.Error(t, err)
require.ErrorAs(t, err, &header.ErrHeadersLimitExceeded)
// request the max amount of headers
to := gen.Height() + header.MaxRangeRequestSize + 1 // add 1 to account for `from` being inclusive
headers, err := exchange.GetRangeByHeight(ctx, gen, to)
require.NoError(t, err)
assert.Len(t, headers, int(header.MaxRangeRequestSize))
}

// TestExchange_RequestHeadersFromAnotherPeer tests that the Exchange instance will request range
// from another peer with lower score after receiving header.ErrNotFound
func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) {
hosts := createMocknet(t, 3)
// create client + server(it does not have needed headers)
exchg, _ := createP2PExAndServer(t, hosts[0], hosts[1])
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])
// create one more server(with more headers in the store)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](
hosts[2], headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10),
Expand All @@ -273,7 +278,11 @@ func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) {
exchg.peerTracker.peerLk.Lock()
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 20}
exchg.peerTracker.peerLk.Unlock()
_, err = exchg.GetRangeByHeight(context.Background(), 5, 3)

h, err := store.GetByHeight(context.Background(), 5)
require.NoError(t, err)

_, err = exchg.GetRangeByHeight(context.Background(), h, 8)
require.NoError(t, err)
// ensure that peerScore for the second peer is changed
newPeerScore := exchg.peerTracker.trackedPeers[hosts[2].ID()].score()
Expand Down Expand Up @@ -472,7 +481,7 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
dial(swarm1, swarm2)

// create client + server(it does not have needed headers)
exchg, _ := createP2PExAndServer(t, host0, host1)
exchg, store := createP2PExAndServer(t, host0, host1)
exchg.Params.RangeRequestTimeout = time.Millisecond * 100
// create one more server(with more headers in the store)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](
Expand All @@ -490,7 +499,11 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
exchg.peerTracker.peerLk.Lock()
exchg.peerTracker.trackedPeers[host2.ID()] = &peerStat{peerID: host2.ID(), peerScore: 200}
exchg.peerTracker.peerLk.Unlock()
_, err = exchg.GetRangeByHeight(context.Background(), 1, 3)

gen, err := store.GetByHeight(context.Background(), 1)
require.NoError(t, err)

_, err = exchg.GetRangeByHeight(context.Background(), gen, 3)
require.NoError(t, err)
newPeerScore := exchg.peerTracker.trackedPeers[host1.ID()].score()
assert.NotEqual(t, newPeerScore, prevScore)
Expand All @@ -500,7 +513,7 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
// from server, Exchange will re-request remaining headers from another peer
func TestExchange_RequestPartialRange(t *testing.T) {
hosts := createMocknet(t, 3)
exchg, _ := createP2PExAndServer(t, hosts[0], hosts[1])
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])

// create one more server(with more headers in the store)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](
Expand All @@ -518,7 +531,11 @@ func TestExchange_RequestPartialRange(t *testing.T) {
// reducing peerScore of the second server, so our exchange will request host[1] first.
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 50}
exchg.peerTracker.peerLk.Unlock()
h, err := exchg.GetRangeByHeight(ctx, 1, 8)

gen, err := store.GetByHeight(context.Background(), 1)
require.NoError(t, err)

h, err := exchg.GetRangeByHeight(ctx, gen, 8)
require.NotNil(t, h)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
}

// might be a case when store hasn't synced yet to the requested range
if uint64(head.Height()) < from {
if head.Height() < from {
span.SetStatus(codes.Error, header.ErrNotFound.Error())
log.Debugw("server: requested headers not stored",
"from", from,
Expand All @@ -229,7 +229,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
to = uint64(head.Height()) + 1
}

headersByRange, err := serv.store.GetRangeByHeight(ctx, from, to)
headersByRange, err := serv.store.GetRange(ctx, from, to)
if err != nil {
span.SetStatus(codes.Error, err.Error())
if errors.Is(err, context.DeadlineExceeded) {
Expand Down
21 changes: 21 additions & 0 deletions p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/headertest"
"github.com/celestiaorg/go-header/store"
)
Expand All @@ -30,3 +31,23 @@ func TestExchangeServer_handleRequestTimeout(t *testing.T) {
_, err = server.handleRequest(1, 200)
require.Error(t, err)
}

func TestExchangeServer_errorsOnLargeRequest(t *testing.T) {
peer := createMocknet(t, 1)
s, err := store.NewStore[*headertest.DummyHeader](datastore.NewMapDatastore())
require.NoError(t, err)
server, err := NewExchangeServer[*headertest.DummyHeader](
peer[0],
s,
WithNetworkID[ServerParameters](networkID),
)
require.NoError(t, err)
err = server.Start(context.Background())
require.NoError(t, err)
t.Cleanup(func() {
server.Stop(context.Background()) //nolint:errcheck
})

_, err = server.handleRequest(1, header.MaxRangeRequestSize*2)
require.Error(t, err)
}
2 changes: 1 addition & 1 deletion p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *session[H]) doRequest(
responseLn := uint64(len(h))
// ensure that we received the correct amount of headers.
if responseLn < req.Amount {
from := uint64(h[responseLn-1].Height())
from := h[responseLn-1].Height()
amount := req.Amount - responseLn

select {
Expand Down
Loading

0 comments on commit 26daa91

Please sign in to comment.