From eaceb095c38007ce68d7f829d095bafca33ba592 Mon Sep 17 00:00:00 2001 From: Elad Date: Fri, 24 May 2019 17:21:07 +0200 Subject: [PATCH] bzzeth: complete protocol flow --- bzzeth/bzzeth.go | 430 ++++++++++++++++++++++++++++++++++++++++++ bzzeth/bzzeth_test.go | 247 ++++++++++++++++++++++++ bzzeth/peer.go | 170 +++++++++++++++++ bzzeth/wire.go | 64 +++++++ swarm.go | 29 ++- 5 files changed, 932 insertions(+), 8 deletions(-) create mode 100644 bzzeth/bzzeth.go create mode 100644 bzzeth/bzzeth_test.go create mode 100644 bzzeth/peer.go create mode 100644 bzzeth/wire.go diff --git a/bzzeth/bzzeth.go b/bzzeth/bzzeth.go new file mode 100644 index 0000000000..a36e1fa991 --- /dev/null +++ b/bzzeth/bzzeth.go @@ -0,0 +1,430 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package bzzeth + +import ( + "context" + "encoding/hex" + "sync" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/network/timeouts" + "github.com/ethersphere/swarm/p2p/protocols" + "github.com/ethersphere/swarm/spancontext" + "github.com/ethersphere/swarm/storage" + // "github.com/ethersphere/swarm/storage/localstore" +) + +// BzzEth implements node.Service +var _ node.Service = &BzzEth{} + +// BzzEth is a global module handling ethereum state on swarm +type BzzEth struct { + peers *peers // bzzeth peer pool + netStore *storage.NetStore // netstore to retrieve and store + kad *network.Kademlia // kademlia to determine if a header chunk belongs to us + quit chan struct{} // quit channel to close go routines +} + +// New constructs the BzzEth node service +func New(ns *storage.NetStore, kad *network.Kademlia) *BzzEth { + return &BzzEth{ + peers: newPeers(), + netStore: ns, + kad: kad, + quit: make(chan struct{}), + } +} + +// Run is the bzzeth protocol run function. +// - creates a peer +// - checks if it is a swarm node, put the protocol in idle mode +// - performs handshake +// - adds peer to the peerpool +// - starts incoming message handler loop +func (b *BzzEth) Run(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := protocols.NewPeer(p, rw, Spec) + bp := NewPeer(peer) + + // perform handshake and register if peer serves headers + handshake, err := bp.Handshake(context.TODO(), Handshake{ServeHeaders: true}, nil) + if err != nil { + return err + } + bp.serveHeaders = handshake.(*Handshake).ServeHeaders + log.Warn("handshake", "hs", handshake, "peer", bp) + // with another swarm node the protocol goes into idle + if isSwarmNodeFunc(bp) { + <-b.quit + return nil + } + b.peers.add(bp) + defer b.peers.remove(bp) + + return peer.Run(b.handleMsg(bp)) +} + +// handleMsg is the message handler that delegates incoming messages +// handlers are called asynchronously so handler calls do not block incoming msg processing +func (b *BzzEth) handleMsg(p *Peer) func(context.Context, interface{}) error { + return func(ctx context.Context, msg interface{}) error { + p.logger.Debug("bzzeth.handleMsg") + switch msg := msg.(type) { + case *NewBlockHeaders: + go b.handleNewBlockHeaders(ctx, p, msg) + case *GetBlockHeaders: + go b.handleGetBlockHeaders(ctx, p, msg) + case *BlockHeaders: + go b.handleBlockHeaders(ctx, p, msg) + } + return nil + } +} + +// handles new header hashes - strategy; only request headers that are in Kad Nearest Neighbourhood +func (b *BzzEth) handleNewBlockHeaders(ctx context.Context, p *Peer, msg *NewBlockHeaders) { + p.logger.Debug("bzzeth.handleNewBlockHeaders") + // collect the hashes of block headers we want + var hashes [][]byte + for _, h := range msg.Headers { + if wantHeaderFunc(h.Hash, b.kad) { + hashes = append(hashes, h.Hash) + } + } + + // request them from the offering peer and deliver in a channel + deliveries := make(chan []byte) + req, err := p.getBlockHeaders(ctx, hashes, deliveries) + defer req.cancel() + deliveredCnt := 0 + // this loop blocks until all delivered or context done + // only needed to log results + for { + select { + case _, ok := <-deliveries: + if !ok { + p.logger.Debug("bzzeth.handleNewBlockHeaders", "delivered", deliveredCnt) + return + } + deliveredCnt++ + if deliveredCnt == len(hashes) { + p.logger.Debug("bzzeth.handleNewBlockHeaders", "delivered", deliveredCnt) + return + } + case <-ctx.Done(): + p.logger.Debug("bzzeth.handleNewBlockHeaders", "delivered", deliveredCnt, "err", err) + return + } + } +} + +// wantHeaderFunc is used to determine if we need a particular header offered as latest +// by an eth fullnode +// tests reassign this to control +var wantHeaderFunc = wantHeader + +// wantHeader returns true iff the hash argument falls in the NN of kademlia +func wantHeader(hash []byte, kad *network.Kademlia) bool { + return chunk.Proximity(kad.BaseAddr(), hash) >= kad.NeighbourhoodDepth() +} + +// requestAll requests each hash and channel +func (b *BzzEth) requestAll(ctx context.Context, deliveries chan []byte, hashes [][]byte) { + ctx, cancel := context.WithTimeout(ctx, timeouts.FetcherGlobalTimeout) + defer cancel() + + // missingHeaders collects hashes of headers not found within swarm + // ie., the hashes to request from the eth full nodes + missingHeaders := make(chan []byte) + + wg := sync.WaitGroup{} + defer close(deliveries) + wg.Add(1) +BZZ: + for _, h := range hashes { + wg.Add(1) + go func() { + defer wg.Done() + header, err := b.getBlockHeaderBzz(ctx, h) + if err != nil { + log.Debug("bzzeth.requestAll: netstore.Get can not retrieve chunk", "ref", hex.EncodeToString(h), "err", err) + select { + case missingHeaders <- h: // fallback: request header from eth peers + case <-ctx.Done(): + } + return + } + select { + case deliveries <- header: + case <-ctx.Done(): + } + }() + select { + case <-ctx.Done(): + break BZZ + default: + } + } + + // fall back to retrieval from eth clients + // collect missing block header hashes + // terminates after missingHeaders is read and closed or context is done + go b.getBlockHeadersEth(ctx, missingHeaders, deliveries) + + // wait till all hashes are requested from swarm, then close missingHeaders channel + // this cannot block as this function is called async + wg.Done() + wg.Wait() + close(missingHeaders) +} + +// getBlockHeadersEth manages fetching headers from ethereum bzzeth nodes +// This is part of the response to GetBlockHeaders requests by bzzeth light/syncing nodes +// As a fallback after header retrieval from local storage and swarm network are unsuccessful +// When called, it +// - reads requested header hashes from a channel (headerC) and +// - creates batch requests and sends them to an adequate bzzeth peer +// - channels the responses into a delivery channel (deliveries) +func (b *BzzEth) getBlockHeadersEth(ctx context.Context, headersC, deliveries chan []byte) { + // read header requests into batches + readNext := make(chan chan [][]byte) + batches := make(chan [][]byte) + go readToBatches(headersC, readNext) + readNext <- batches + + // send GetBlockHeader requests to adequate bzzeth peers + // this loop terminates when batches channel is closed as a result of input headersC being closed + var reqs []*request + for headers := range batches { + p := b.peers.getEth() // find candidate peer to serve the headers + if p == nil { // if no peer found just skip the batch TODO: smarter retry? + continue + } + // initiate request with the chosen peer + req, err := p.getBlockHeaders(ctx, headers, deliveries) + if err != nil { // in case of failure, no retries TODO: smarter retry? + continue + } + reqs = append(reqs, req) // remember the request so that it can be cancelled + } + cancelAll(reqs...) +} + +// cancelAll cancels all requests given as arguments +func cancelAll(reqs ...*request) { + for _, req := range reqs { + req.cancel() + } +} + +// getBlockHeaderBzz retrieves a block header by its hash from swarm +func (b *BzzEth) getBlockHeaderBzz(ctx context.Context, hash []byte) ([]byte, error) { + req := &storage.Request{ + Addr: hash, + // Origin: b.ID(), + } + chunk, err := b.netStore.Get(ctx, chunk.ModeGetRequest, req) + if err != nil { + return nil, err + } + return chunk.Data(), nil +} + +// handles GetBlockHeader requests, in the protocol handler this call is asynchronous +// so it is safe to have it run until delivery is finished +func (b *BzzEth) handleGetBlockHeaders(ctx context.Context, p *Peer, msg *GetBlockHeaders) { + total := len(msg.Hashes) + p.logger.Debug("bzzeth.handleGetBlockHeaders", "id", msg.ID) + ctx, osp := spancontext.StartSpan(ctx, "bzzeth.handleGetBlockHeaders") + defer osp.Finish() + + // deliver in batches, this blocks until total number of requests are delivered or considered not found + deliveries := make(chan []byte) + trigger := make(chan chan [][]byte) + batches := make(chan [][]byte) + defer close(trigger) + go readToBatches(deliveries, trigger) + + // asynchronously request all headers as swarm chunks + go b.requestAll(ctx, deliveries, msg.Hashes) + deliveredCnt := 0 + var err error + // this loop terminates if + // - batches channel is closed (because the underlying deliveries channel is closed) OR + // - context is done + // the implementation aspires to send as many as possible as early as possible +DELIVERY: + for headers := range batches { + deliveredCnt += len(headers) + if err = p.Send(ctx, &BlockHeaders{ + ID: msg.ID, + Headers: headers, + }); err != nil { // in case of a send error, the peer will disconnect so can safely return + break DELIVERY + } + select { + case trigger <- batches: // signal that we are ready for another batch + case <-ctx.Done(): + break DELIVERY + } + } + + p.logger.Debug("bzzeth.handleGetBlockHeaders", "id", msg.ID, "total", total, "delivered", deliveredCnt, "err", err) + + if err == nil && deliveredCnt < total { // if there was no send error and we deliver less than requested + p.Send(ctx, &BlockHeaders{ID: msg.ID}) // it is prudent to send an empty BlockHeaders message + } +} + +// handleBlockHeaders handles block headers message +func (b *BzzEth) handleBlockHeaders(ctx context.Context, p *Peer, msg *BlockHeaders) { + p.logger.Debug("bzzeth.handleBlockHeaders", "id", msg.ID) + + // retrieve the request for this id :TODO: + req, ok := p.requests.get(msg.ID) + if !ok { + p.logger.Warn("bzzeth.handleBlockHeaders: nonexisting request id", "id", msg.ID) + p.Drop() + return + } + err := b.deliverAll(ctx, req.c, msg.Headers) + if err != nil { + p.logger.Warn("bzzeth.handleBlockHeaders: fatal dropping peer", "id", msg.ID, "err", err) + p.Drop() + } +} + +// store delivery +func (b *BzzEth) deliverAll(ctx context.Context, deliveries chan []byte, headers [][]byte) error { + errc := make(chan error, 1) // only the first error propagetes + go b.storeAll(ctx, errc, headers) // storing all heades, pro + return <-errc +} + +// stores all headers asynchronously, reports store error on errc +func (b *BzzEth) storeAll(ctx context.Context, errc chan error, headers [][]byte) { + defer close(errc) + for _, h := range headers { + h := h + go func() { + // TODO: unsolicited header validation should come here + // TODO: header validation should come here + if err := b.store(ctx, h); err != nil { + select { + case errc <- err: // buffered channel, + default: // there is already an error, ignore + } + } + }() + } +} + +// store stores a header as a chunk, returns error if and only if invalid chunk +func (b *BzzEth) store(ctx context.Context, header []byte) error { + ch := newChunk(header) + _, err := b.netStore.Put(ctx, chunk.ModePutSync, ch) + if err != nil { + log.Warn("bzzeth.store", "hash", ch.Address().Hex(), "err", err) + // ignore all other errors, but invalid chunk incurs peer drop + if err == chunk.ErrChunkInvalid { + return err + } + } + return nil +} + +// newChunk creates a new content addressed chunk from data using Keccak256 SHA3 hash +func newChunk(data []byte) chunk.Chunk { + hash := crypto.Keccak256(data) + return chunk.NewChunk(hash, data) +} + +// Protocols returns the p2p protocol +func (b *BzzEth) Protocols() []p2p.Protocol { + return []p2p.Protocol{ + { + Name: Spec.Name, + Version: Spec.Version, + Length: Spec.Length(), + Run: b.Run, + }, + } +} + +// APIs return APIs defined on the node service +func (b *BzzEth) APIs() []rpc.API { + return nil +} + +// Start starts the BzzEth node service +func (b *BzzEth) Start(server *p2p.Server) error { + log.Info("bzzeth starting...") + return nil +} + +// Stop stops the BzzEth node service +func (b *BzzEth) Stop() error { + log.Info("bzzeth shutting down...") + close(b.quit) + return nil +} + +var batchWait = 100 * time.Millisecond + +// readToBatches reads items from an input channel into a buffer and +// sends non-empty buffers on a channel read from the out +func readToBatches(in chan []byte, out chan chan [][]byte) { + var buffer [][]byte + var trigger chan chan [][]byte +BATCH: + for { + select { + case batches := <-trigger: // new batch channel available + if batches == nil { // terminate if batches channel is closed, no more batches accepted + return + } + batches <- buffer // otherwise write buffer into batch channel + if in == nil { // terminate if in channel is already closed, sent last batch + return + } + buffer = nil // otherwise start new buffer + trigger = nil // block this case: disallow new batches until enough in buffer + + case item, more := <-in: // reading input + if !more { + in = nil // block this case: disallow read from closed channel + continue BATCH // wait till last batch can send + } + // otherwise collect item in buffer + buffer = append(buffer, item) + + default: + if len(buffer) > 0 { // if buffer is not empty + trigger = out // allow sending batch + continue BATCH // wait till next batch can send + } + time.Sleep(batchWait) // otherwise wait and continue + } + } +} diff --git a/bzzeth/bzzeth_test.go b/bzzeth/bzzeth_test.go new file mode 100644 index 0000000000..c2f5f8af80 --- /dev/null +++ b/bzzeth/bzzeth_test.go @@ -0,0 +1,247 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package bzzeth + +import ( + "bytes" + "errors" + "flag" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethersphere/swarm/network" + p2ptest "github.com/ethersphere/swarm/p2p/testing" +) + +var ( + loglevel = flag.Int("loglevel", 0, "verbosity of logs") +) + +func init() { + flag.Parse() + + log.PrintOrigins(true) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(false)))) +} + +func newBzzEthTester() (*p2ptest.ProtocolTester, *BzzEth, func(), error) { + b := New(nil, nil) + + prvkey, err := crypto.GenerateKey() + if err != nil { + return nil, nil, nil, err + } + + protocolTester := p2ptest.NewProtocolTester(prvkey, 1, b.Run) + teardown := func() { + protocolTester.Stop() + } + + return protocolTester, b, teardown, nil +} + +func handshakeExchange(tester *p2ptest.ProtocolTester, peerID enode.ID, serveHeadersPeer, serveHeadersPivot bool) error { + return tester.TestExchanges( + p2ptest.Exchange{ + Label: "Handshake", + Triggers: []p2ptest.Trigger{ + { + Code: 0, + Msg: Handshake{ + ServeHeaders: serveHeadersPeer, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 0, + Msg: Handshake{ + ServeHeaders: serveHeadersPivot, + }, + Peer: peerID, + }, + }, + }) +} + +// tests handshake between eth node and swarm node +// on successful handshake the protocol does not go idle +// peer added to the pool and serves headers is registered +func TestBzzEthHandshake(t *testing.T) { + tester, b, teardown, err := newBzzEthTester() + if err != nil { + t.Fatal(err) + } + defer teardown() + + node := tester.Nodes[0] + err = handshakeExchange(tester, node.ID(), true, true) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + // after successful handshake, expect peer added to peer pool + var p *Peer + for i := 0; i < 10; i++ { + p = b.peers.get(node.ID()) + if p != nil { + break + } + time.Sleep(100 * time.Millisecond) + } + if p == nil { + t.Fatal("bzzeth peer not added") + } + + if !p.serveHeaders { + t.Fatal("bzzeth peer serveHeaders not set") + } + + close(b.quit) + err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("?")}) + if err == nil || err.Error() != "timed out waiting for peers to disconnect" { + t.Fatal(err) + } +} + +// TestBzzBzzHandshake tests that a handshake between two Swarm nodes +func TestBzzBzzHandshake(t *testing.T) { + tester, b, teardown, err := newBzzEthTester() + if err != nil { + t.Fatal(err) + } + defer teardown() + + // redefine isSwarmNodeFunc to force recognise remote peer as swarm node + defer func(f func(*Peer) bool) { + isSwarmNodeFunc = f + }(isSwarmNodeFunc) + isSwarmNodeFunc = func(_ *Peer) bool { return true } + + node := tester.Nodes[0] + err = handshakeExchange(tester, node.ID(), false, true) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + // after handshake expect protocol to hang, peer not added to pool + p := b.peers.get(node.ID()) + if p != nil { + t.Fatal("bzzeth swarm peer incorrectly added") + } + + // after closing the ptotocall, expect disconnect + close(b.quit) + err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("protocol returned")}) + if err != nil { + t.Fatal(err) + } + +} + +func newBlockHeaderExchange(tester *p2ptest.ProtocolTester, peerID enode.ID, requestID uint32, offered []HeaderHash, wanted [][]byte) error { + return tester.TestExchanges( + p2ptest.Exchange{ + Label: "NewBlockHeaders", + Triggers: []p2ptest.Trigger{ + { + Code: 1, + Msg: NewBlockHeaders{ + Headers: offered, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 2, + Msg: GetBlockHeaders{ + ID: requestID, + Hashes: wanted, + }, + Peer: peerID, + }, + }, + }) +} + +// Test bzzeth full eth node sends new block header hashes +// respond with a GetBlockHeaders requesting headers falling inte +func TestNewBlockHeaders(t *testing.T) { + // bzz pivot - full eth node peer + // NewBlockHeaders trigger, expect + tester, _, teardown, err := newBzzEthTester() + if err != nil { + t.Fatal(err) + } + defer teardown() + + offered := make([]HeaderHash, 256) + for i := 0; i < len(offered); i++ { + offered[i] = HeaderHash{crypto.Keccak256([]byte{uint8(i)}), []byte{uint8(i)}} + } + + // redefine wantHeadeFunc for this test + defer func(f func([]byte, *network.Kademlia) bool) { + wantHeaderFunc = f + }(wantHeaderFunc) + wantedIndexes := []int{1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233} + wantHeaderFunc = func(hash []byte, _ *network.Kademlia) bool { + for _, i := range wantedIndexes { + if bytes.Equal(hash, offered[i].Hash) { + return true + } + } + return false + } + + wanted := make([][]byte, len(wantedIndexes)) + for i, w := range wantedIndexes { + wanted[i] = crypto.Keccak256([]byte{uint8(w)}) + } + + // overwrite newRequestIDFunc to be deterministic + defer func(f func() uint32) { + newRequestIDFunc = f + }(newRequestIDFunc) + + newRequestIDFunc = func() uint32 { + return 42 + } + + node := tester.Nodes[0] + + err = handshakeExchange(tester, node.ID(), true, true) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + err = newBlockHeaderExchange(tester, node.ID(), 42, offered, wanted) + if err != nil { + t.Fatal(err) + } + + // TODO: subsequent NewBlockHeaders msg are not allowed + // TODO: headers found in localstore should not be requested + // TODO: after requested headers arrive and are stored + // TODO: unrequested header delivery drops +} diff --git a/bzzeth/peer.go b/bzzeth/peer.go new file mode 100644 index 0000000000..ff27910422 --- /dev/null +++ b/bzzeth/peer.go @@ -0,0 +1,170 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package bzzeth + +import ( + "context" + "math/rand" + "sync" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethersphere/swarm/p2p/protocols" +) + +// Peer extends p2p/protocols Peer and represents a conrete protocol connection +type Peer struct { + *protocols.Peer // embeds protocols.Peer + serveHeaders bool // if the remote serves headers + requests *requests // per-peer pool of open requests + logger log.Logger // custom logger for peer +} + +// NewPeer is the constructor for Peer +func NewPeer(peer *protocols.Peer) *Peer { + return &Peer{ + Peer: peer, + requests: newRequests(), + logger: log.New("peer", peer.ID()), + } +} + +// peers represents the bzzeth specific peer pool +type peers struct { + mtx sync.RWMutex + peers map[enode.ID]*Peer +} + +func newPeers() *peers { + return &peers{peers: make(map[enode.ID]*Peer)} +} + +func (p *peers) get(id enode.ID) *Peer { + p.mtx.RLock() + defer p.mtx.RUnlock() + return p.peers[id] +} + +func (p *peers) add(peer *Peer) { + p.mtx.Lock() + defer p.mtx.Unlock() + p.peers[peer.ID()] = peer +} + +func (p *peers) remove(peer *Peer) { + p.mtx.Lock() + defer p.mtx.Unlock() + delete(p.peers, peer.ID()) +} + +// getEthPeer finds a peer that serves headers and calls the function argument on this peer +// TODO: implement load balancing of requests in case of multiple peers +func (p *peers) getEth() (peer *Peer) { + p.mtx.RLock() + defer p.mtx.RUnlock() + for _, peer = range p.peers { + if peer.serveHeaders { + break + } + } + return peer +} + +// requests represents the peer specific pool of open requests +type requests struct { + mtx sync.RWMutex // for concurrent access to requests + r map[uint32]*request // requests open for peer +} + +type request struct { + hashes map[string]bool + c chan []byte + cancel func() +} + +// newRequestIDFunc is used to generated unique ID for requests +// tests can reassign for deterministic ids +var newRequestIDFunc = newRequestID + +// newRequestID generates a 32-bit random number to be used as unique id for s +// no reuse of id across peers +func newRequestID() uint32 { + return rand.Uint32() +} + +func newRequests() *requests { + return &requests{ + r: make(map[uint32]*request), + } +} + +// create constructs a new request +// registers it on the peer request pool +// request.cancel() should be called to cleanup +func (r *requests) create(c chan []byte) *request { + req := &request{ + hashes: make(map[string]bool), + c: c, + } + id := newRequestIDFunc() + req.cancel = func() { r.remove(id) } + r.add(id, req) + return req +} + +func (r *requests) add(id uint32, req *request) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.r[id] = req +} + +func (r *requests) remove(id uint32) { + r.mtx.Lock() + defer r.mtx.Unlock() + delete(r.r, id) +} + +func (r *requests) get(id uint32) (*request, bool) { + r.mtx.RLock() + defer r.mtx.RUnlock() + req, ok := r.r[id] + return req, ok +} + +// getBlockHeaders sends a GetBlockHeaders message to the remote peer requesting headers by their _hashes_ +// and delivers the actual block header responses to the deliveries channel +func (p *Peer) getBlockHeaders(ctx context.Context, hashes [][]byte, deliveries chan []byte) (*request, error) { + req := p.requests.create(deliveries) + err := p.Send(ctx, &GetBlockHeaders{ + ID: newRequestIDFunc(), + Hashes: hashes, + }) + if err != nil { + req.cancel() + return nil, err + } + return req, nil +} + +// this function is called to check if the remote peer is another swarm node +// in which case the protocol is idle +// can be reassigned in test to mock a swarm node +var isSwarmNodeFunc = isSwarmNode + +func isSwarmNode(p *Peer) bool { + return p.HasCap("bzz") +} diff --git a/bzzeth/wire.go b/bzzeth/wire.go new file mode 100644 index 0000000000..b0b6ba6ea7 --- /dev/null +++ b/bzzeth/wire.go @@ -0,0 +1,64 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package bzzeth + +import "github.com/ethersphere/swarm/p2p/protocols" + +// Spec is the protocol spec for bzzeth +var Spec = &protocols.Spec{ + Name: "bzzeth", + Version: 1, + MaxMsgSize: 10 * 1024 * 1024, + Messages: []interface{}{ + Handshake{}, + NewBlockHeaders{}, + GetBlockHeaders{}, + BlockHeaders{}, + }, + DisableContext: true, +} + +// Handshake is used in between the ethereum node and the Swarm node +type Handshake struct { + ServeHeaders bool // indicates if this node is expected to serve requests for headers +} + +// NewBlockHeaders is sent from the Ethereum client to the Swarm node +type NewBlockHeaders struct { + Headers []HeaderHash +} + +// HeaderHash encodes an ethereum block hash +type HeaderHash struct { + Hash []byte // block hash + Number []byte // block height +} + +// GetBlockHeaders is used between a Swarm node and the Ethereum node in two cases: +// 1. When an Ethereum node asks the header corresponding to the hashes in the message (eth -> bzz) +// 2. When a Swarm node cannot find a particular header in the network, it asks the ethereum node for the header in order to push it to the network (bzz -> eth) +type GetBlockHeaders struct { + ID uint32 // request id + Hashes [][]byte // slice of hashes +} + +// BlockHeaders encapsulates actual header blobs sent as a response to GetBlockHeaders +// multiple responses to the same request, whatever the node has it sends right away +type BlockHeaders struct { + ID uint32 // request id + Headers [][]byte // slice of chunk data (rlp encoded headers) +} diff --git a/swarm.go b/swarm.go index 14edcc3064..a6782059ec 100644 --- a/swarm.go +++ b/swarm.go @@ -32,11 +32,6 @@ import ( "time" "unicode" - "github.com/ethersphere/swarm/chunk" - - "github.com/ethersphere/swarm/storage/feed" - "github.com/ethersphere/swarm/storage/localstore" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" @@ -46,6 +41,8 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethersphere/swarm/api" httpapi "github.com/ethersphere/swarm/api/http" + "github.com/ethersphere/swarm/bzzeth" + "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/contracts/chequebook" "github.com/ethersphere/swarm/contracts/ens" "github.com/ethersphere/swarm/fuse" @@ -56,6 +53,8 @@ import ( "github.com/ethersphere/swarm/pss" "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/storage/feed" + "github.com/ethersphere/swarm/storage/localstore" "github.com/ethersphere/swarm/storage/mock" "github.com/ethersphere/swarm/swap" "github.com/ethersphere/swarm/tracing" @@ -75,6 +74,7 @@ type Swarm struct { dns api.Resolver // DNS registrar fileStore *storage.FileStore // distributed preimage archive, the local API to the storage with document level storage/retrieval support streamer *stream.Registry + bzzEth *bzzeth.BzzEth bzz *network.Bzz // the logistic manager backend chequebook.Backend // simple blockchain Backend privateKey *ecdsa.PrivateKey @@ -212,6 +212,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e MaxPeerServers: config.MaxStreamPeerServers, } self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, self.stateStore, registryOptions, self.swap) + tags := chunk.NewTags() //todo load from state store // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage @@ -222,6 +223,8 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e self.bzz = network.NewBzz(bzzconfig, to, self.stateStore, self.streamer.GetSpec(), self.streamer.Run) + self.bzzEth = bzzeth.New(self.netStore, to) + // Pss = postal service over swarm (devp2p over bzz) self.ps, err = pss.New(to, config.Pss) if err != nil { @@ -379,6 +382,11 @@ func (s *Swarm) Start(srv *p2p.Server) error { } log.Info("Swarm network started", "bzzaddr", fmt.Sprintf("%x", s.bzz.Hive.BaseAddr())) + err = s.bzzEth.Start(srv) + if err != nil { + return err + } + if s.ps != nil { s.ps.Start(srv) } @@ -464,7 +472,12 @@ func (s *Swarm) Stop() error { stopCounter.Inc(1) s.streamer.Stop() - err := s.bzz.Stop() + err := s.bzzEth.Stop() + if err != nil { + log.Error("error during bzz-eth shutdown", "err", err) + } + + err = s.bzz.Stop() if s.stateStore != nil { s.stateStore.Close() } @@ -485,7 +498,7 @@ func (s *Swarm) Protocols() (protos []p2p.Protocol) { protos = append(protos, s.bzz.Protocols()...) } else { protos = append(protos, s.bzz.Protocols()...) - + protos = append(protos, s.bzzEth.Protocols()...) if s.ps != nil { protos = append(protos, s.ps.Protocols()...) } @@ -532,7 +545,7 @@ func (s *Swarm) APIs() []rpc.API { } apis = append(apis, s.bzz.APIs()...) - + apis = append(apis, s.bzzEth.APIs()...) apis = append(apis, s.streamer.APIs()...) if s.ps != nil {