This repository has been archived by the owner on Apr 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from filecoin-project/feat/CAR-transfer
Stream IPLD traversal over libp2p with CAR files
- Loading branch information
Showing
11 changed files
with
2,140 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
module github.com/filecoin-project/saturn-l2 | ||
|
||
go 1.16 | ||
|
||
require ( | ||
github.com/ipfs/go-cid v0.1.0 | ||
github.com/ipfs/go-ipfs-blockstore v1.2.0 | ||
github.com/ipfs/go-log/v2 v2.5.1 | ||
github.com/ipld/go-car v0.3.3 | ||
github.com/ipld/go-car/v2 v2.1.2-0.20220429070120-51b5cbdd49db | ||
github.com/ipld/go-ipld-prime v0.16.0 | ||
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73 | ||
github.com/libp2p/go-libp2p v0.19.2 | ||
github.com/libp2p/go-libp2p-core v0.15.1 | ||
github.com/stretchr/testify v1.7.0 | ||
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
package libp2pcarserver | ||
|
||
import ( | ||
"bufio" | ||
"context" | ||
"encoding/json" | ||
"io" | ||
"io/ioutil" | ||
"time" | ||
|
||
logging "github.com/ipfs/go-log/v2" | ||
|
||
bstore "github.com/ipfs/go-ipfs-blockstore" | ||
|
||
"github.com/filecoin-project/saturn-l2/store" | ||
|
||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/protocol" | ||
|
||
"github.com/libp2p/go-libp2p-core/host" | ||
|
||
cidlink "github.com/ipld/go-ipld-prime/linking/cid" | ||
"github.com/ipld/go-ipld-prime/storage/bsadapter" | ||
|
||
"github.com/ipld/go-car/v2" | ||
) | ||
|
||
const CARTransferProtocol = "/saturn/l2/car/1.0" // car transfer protocol | ||
var log = logging.Logger("car-transfer") | ||
|
||
var ( | ||
maxRequestSize = 1048576 // 1 MiB - max size of the CAR transfer request | ||
readDeadline = 10 * time.Second | ||
writeDeadline = 30 * time.Minute | ||
) | ||
|
||
// Libp2pCARServer serves CAR files for a given root and selector over the libp2p CARTransferProtocol. | ||
type Libp2pCARServer struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
h host.Host | ||
|
||
cs *store.CARStore | ||
} | ||
|
||
func New(h host.Host, cs *store.CARStore) *Libp2pCARServer { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
return &Libp2pCARServer{ | ||
ctx: ctx, | ||
cancel: cancel, | ||
h: h, | ||
cs: cs, | ||
} | ||
} | ||
|
||
func (l *Libp2pCARServer) Start() { | ||
l.h.SetStreamHandler(protocol.ID(CARTransferProtocol), l.serveCARFile) | ||
} | ||
|
||
func (l *Libp2pCARServer) Stop() { | ||
l.cancel() | ||
l.h.RemoveStreamHandler(protocol.ID(CARTransferProtocol)) | ||
} | ||
|
||
func (l *Libp2pCARServer) serveCARFile(s network.Stream) { | ||
defer s.Close() | ||
|
||
// Set a deadline on reading from the stream so it does NOT hang | ||
_ = s.SetReadDeadline(time.Now().Add(readDeadline)) | ||
defer s.SetReadDeadline(time.Time{}) // nolint | ||
|
||
reqBz, err := ioutil.ReadAll(io.LimitReader(s, network.MessageSizeMax)) | ||
if err != nil { | ||
return | ||
} | ||
|
||
// read the json car transfer request | ||
var req CARTransferRequest | ||
if err := json.Unmarshal(reqBz, &req); err != nil { | ||
return | ||
} | ||
dr, err := carRequestToDAGRequest(&req) | ||
if err != nil { | ||
return | ||
} | ||
log.Debugw("car transfer request", "base64-root", req.Root, "base64-selector", req.Selector) | ||
|
||
// Set a deadline on writing to the stream so it doesn't hang | ||
_ = s.SetWriteDeadline(time.Now().Add(writeDeadline)) | ||
defer s.SetWriteDeadline(time.Time{}) // nolint | ||
|
||
if err := l.cs.WithStore(dr.root, func(ro bstore.Blockstore) error { | ||
ls := cidlink.DefaultLinkSystem() | ||
bsa := bsadapter.Adapter{Wrapped: ro} | ||
ls.SetReadStorage(&bsa) | ||
|
||
bf := bufio.NewWriter(s) | ||
defer bf.Flush() | ||
|
||
_, err = car.TraverseV1(l.ctx, &ls, dr.root, dr.selector, bf) | ||
if err != nil { | ||
return err | ||
} | ||
return nil | ||
}); err != nil { | ||
log.Errorw("car transfer failed", "base64-root", req.Root, "base64-selector", req.Selector, "err", err) | ||
} | ||
log.Debugw("car transfer successful", "base64-root", req.Root, "base64-selector", req.Selector) | ||
|
||
// TODO record sent bytes | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
package libp2pcarserver | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/base64" | ||
"encoding/json" | ||
"io/ioutil" | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
bstore "github.com/ipfs/go-ipfs-blockstore" | ||
"github.com/ipld/go-car" | ||
|
||
"github.com/filecoin-project/saturn-l2/store" | ||
|
||
"github.com/ipld/go-ipld-prime/codec/dagcbor" | ||
|
||
"github.com/ipld/go-car/v2/blockstore" | ||
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" | ||
|
||
"github.com/libp2p/go-libp2p-core/peer" | ||
|
||
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestSimpleTransfer(t *testing.T) { | ||
ctx := context.Background() | ||
mn := mocknet.New() | ||
|
||
p1, err := mn.GenPeer() | ||
require.NoError(t, err) | ||
p2, err := mn.GenPeer() | ||
require.NoError(t, err) | ||
cs, err := store.NewCARStore(t.TempDir()) | ||
require.NoError(t, err) | ||
l := New(p2, cs) | ||
l.Start() | ||
|
||
require.NoError(t, mn.LinkAll()) | ||
|
||
p1.Peerstore().AddAddrs(p2.ID(), p2.Addrs(), 1*time.Hour) | ||
require.NoError(t, p1.Connect(ctx, peer.AddrInfo{ID: p2.ID()})) | ||
|
||
s, err := p1.NewStream(ctx, p2.ID(), CARTransferProtocol) | ||
require.NoError(t, err) | ||
|
||
from, err := blockstore.OpenReadOnly("../testdata/files/sample-v1.car") | ||
require.NoError(t, err) | ||
rts, err := from.Roots() | ||
require.NoError(t, err) | ||
require.NoError(t, from.Close()) | ||
// add the car file to the car store | ||
require.NoError(t, cs.Create(rts[0], rts[0], func(bs bstore.Blockstore) error { | ||
f, err := os.Open("../testdata/files/sample-v1.car") | ||
if err != nil { | ||
return err | ||
} | ||
if _, err := car.LoadCar(ctx, bs, f); err != nil { | ||
return err | ||
} | ||
if err := f.Close(); err != nil { | ||
return err | ||
} | ||
return nil | ||
})) | ||
|
||
rtbz := rts[0].Bytes() | ||
|
||
bf := bytes.Buffer{} | ||
require.NoError(t, dagcbor.Encode(selectorparse.CommonSelector_ExploreAllRecursively, &bf)) | ||
|
||
req := CARTransferRequest{ | ||
Root: base64.StdEncoding.EncodeToString(rtbz), | ||
Selector: base64.StdEncoding.EncodeToString(bf.Bytes()), | ||
} | ||
reqBz, err := json.Marshal(req) | ||
require.NoError(t, err) | ||
_, err = s.Write(reqBz) | ||
require.NoError(t, err) | ||
require.NoError(t, s.CloseWrite()) | ||
|
||
resp, err := ioutil.ReadAll(s) | ||
require.NoError(t, err) | ||
require.NotEmpty(t, resp) | ||
|
||
// ensure contents match | ||
fbz, err := ioutil.ReadFile("../testdata/files/sample-v1.car") | ||
require.NoError(t, err) | ||
require.EqualValues(t, fbz, resp) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package libp2pcarserver | ||
|
||
import ( | ||
"bytes" | ||
"encoding/base64" | ||
"fmt" | ||
|
||
"github.com/ipfs/go-cid" | ||
"github.com/ipld/go-ipld-prime" | ||
"github.com/ipld/go-ipld-prime/codec/dagcbor" | ||
basicnode "github.com/ipld/go-ipld-prime/node/basic" | ||
) | ||
|
||
// CARTransferRequest is the request sent by the client to transfer a CAR file | ||
// for the given root and selector | ||
type CARTransferRequest struct { | ||
Root string // base64 encoded byte array | ||
Selector string // base 64 encoded byte array | ||
} | ||
|
||
type dagTraversalRequest struct { | ||
root cid.Cid | ||
selector ipld.Node | ||
} | ||
|
||
func carRequestToDAGRequest(req *CARTransferRequest) (*dagTraversalRequest, error) { | ||
rootbz, err := base64.StdEncoding.DecodeString(req.Root) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to decode root: %s", err) | ||
} | ||
rootcid, err := cid.Cast(rootbz) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to cast root to cid: %s", err) | ||
} | ||
|
||
selbz, err := base64.StdEncoding.DecodeString(req.Selector) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to decode selector: %s", err) | ||
} | ||
sel, err := decodeSelector(selbz) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to decode selector to ipld node: %s", err) | ||
} | ||
|
||
return &dagTraversalRequest{ | ||
root: rootcid, | ||
selector: sel, | ||
}, nil | ||
} | ||
|
||
func decodeSelector(sel []byte) (ipld.Node, error) { | ||
nb := basicnode.Prototype.Any.NewBuilder() | ||
if err := dagcbor.Decode(nb, bytes.NewReader(sel)); err != nil { | ||
return nil, err | ||
} | ||
return nb.Build(), nil | ||
} |
Oops, something went wrong.