Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
stream car files over libp2p
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed May 12, 2022
1 parent 076d15e commit 076ddd9
Show file tree
Hide file tree
Showing 9 changed files with 1,951 additions and 0 deletions.
13 changes: 13 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module github.com/filecoin-project/saturn-l2

go 1.16

require (
github.com/ipfs/go-cid v0.1.0
github.com/ipld/go-car/v2 v2.1.2-0.20220429070120-51b5cbdd49db
github.com/ipld/go-ipld-prime v0.16.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73
github.com/libp2p/go-libp2p v0.19.2
github.com/libp2p/go-libp2p-core v0.15.1
github.com/stretchr/testify v1.7.0
)
1,651 changes: 1,651 additions & 0 deletions go.sum

Large diffs are not rendered by default.

152 changes: 152 additions & 0 deletions libp2pcarserver/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package libp2pcarserver

import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"time"

"github.com/ipld/go-ipld-prime/codec/dagcbor"
basicnode "github.com/ipld/go-ipld-prime/node/basic"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/libp2p/go-libp2p-core/host"

"github.com/ipld/go-car/v2/blockstore"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"

"github.com/ipld/go-car/v2"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
)

const CarTransferProtocol = "/saturn/l2/car/1.0" // car transfer protocol

var (
maxRequestSize = 1048576 // 1 MiB - max size of the CAR transfer request
readDeadline = 10 * time.Second
writeDeadline = 30 * time.Minute
)

// CarTransferRequest is the request sent by the client to transfer a CAR file
// for the given root and selector
type CarTransferRequest struct {
Root string // base64 encoded byte array
Selector string // base 64 encoded byte array
}

type dagTraversalRequest struct {
root cid.Cid
selector ipld.Node
}

// Libp2pCARServer serves CAR files for a given root and selector over the libp2p CarTransferProtocol.
type Libp2pCARServer struct {
ctx context.Context
cancel context.CancelFunc
h host.Host
}

func New(h host.Host) *Libp2pCARServer {
ctx, cancel := context.WithCancel(context.Background())
return &Libp2pCARServer{
ctx: ctx,
cancel: cancel,
h: h,
}
}

func (l *Libp2pCARServer) Start() {
l.h.SetStreamHandler(protocol.ID(CarTransferProtocol), l.Serve)
}

func (l *Libp2pCARServer) Stop() {
l.cancel()
l.h.RemoveStreamHandler(protocol.ID(CarTransferProtocol))
}

func (l *Libp2pCARServer) Serve(s network.Stream) {
defer s.Close()

// Set a deadline on reading from the stream so it does NOT hang
_ = s.SetReadDeadline(time.Now().Add(readDeadline))
defer s.SetReadDeadline(time.Time{}) // nolint

reqBz, err := ioutil.ReadAll(io.LimitReader(s, network.MessageSizeMax))
if err != nil {
return
}
// read the json car transfer request
var req CarTransferRequest
if err := json.Unmarshal(reqBz, &req); err != nil {
return
}
dr, err := carRequestToDAGRequest(&req)
if err != nil {
return
}

// Set a deadline on writing to the stream so it doesn't hang
_ = s.SetWriteDeadline(time.Now().Add(writeDeadline))
defer s.SetWriteDeadline(time.Time{}) // nolint

// TODO Map root to CAR files
from, err := blockstore.OpenReadOnly("../testdata/files/sample-v1.car")
if err != nil {
return
}
ls := cidlink.DefaultLinkSystem()
bsa := bsadapter.Adapter{Wrapped: from}
ls.SetReadStorage(&bsa)

bf := bufio.NewWriter(s)
defer bf.Flush()

_, err = car.TraverseV1(l.ctx, &ls, dr.root, dr.selector, bf)
if err != nil {
return
}
// TODO record sent bytes
}

func carRequestToDAGRequest(req *CarTransferRequest) (*dagTraversalRequest, error) {
rootbz, err := base64.StdEncoding.DecodeString(req.Root)
if err != nil {
return nil, fmt.Errorf("failed to decode root: %s", err)
}
rootcid, err := cid.Cast(rootbz)
if err != nil {
return nil, fmt.Errorf("failed to cast root to cid: %s", err)
}

selbz, err := base64.StdEncoding.DecodeString(req.Selector)
if err != nil {
return nil, fmt.Errorf("failed to decode selector: %s", err)
}
sel, err := decodeSelector(selbz)
if err != nil {
return nil, fmt.Errorf("failed to decode selector to ipld node: %s", err)
}

return &dagTraversalRequest{
root: rootcid,
selector: sel,
}, nil
}

func decodeSelector(sel []byte) (ipld.Node, error) {
nb := basicnode.Prototype.Any.NewBuilder()
if err := dagcbor.Decode(nb, bytes.NewReader(sel)); err != nil {
return nil, err
}
return nb.Build(), nil
}
69 changes: 69 additions & 0 deletions libp2pcarserver/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package libp2pcarserver

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"io/ioutil"
"testing"
"time"

"github.com/ipld/go-ipld-prime/codec/dagcbor"

"github.com/ipld/go-car/v2/blockstore"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"

"github.com/libp2p/go-libp2p-core/peer"

mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
)

func TestSimpleTransfer(t *testing.T) {
ctx := context.Background()
mn := mocknet.New()

p1, err := mn.GenPeer()
require.NoError(t, err)
p2, err := mn.GenPeer()
require.NoError(t, err)
l := New(p2)
l.Start()

require.NoError(t, mn.LinkAll())

p1.Peerstore().AddAddrs(p2.ID(), p2.Addrs(), 1*time.Hour)
require.NoError(t, p1.Connect(ctx, peer.AddrInfo{ID: p2.ID()}))

s, err := p1.NewStream(ctx, p2.ID(), CarTransferProtocol)
require.NoError(t, err)

from, err := blockstore.OpenReadOnly("../testdata/files/sample-v1.car")
require.NoError(t, err)
rts, err := from.Roots()
require.NoError(t, err)
rtbz := rts[0].Bytes()

bf := bytes.Buffer{}
require.NoError(t, dagcbor.Encode(selectorparse.CommonSelector_ExploreAllRecursively, &bf))

req := CarTransferRequest{
Root: base64.StdEncoding.EncodeToString(rtbz),
Selector: base64.StdEncoding.EncodeToString(bf.Bytes()),
}
reqBz, err := json.Marshal(req)
require.NoError(t, err)
_, err = s.Write(reqBz)
require.NoError(t, err)
require.NoError(t, s.CloseWrite())

resp, err := ioutil.ReadAll(s)
require.NoError(t, err)
require.NotEmpty(t, resp)

// ensure contents match
fbz, err := ioutil.ReadFile("../testdata/files/sample-v1.car")
require.NoError(t, err)
require.EqualValues(t, fbz, resp)
}
Binary file added testdata/files/junk.dat
Binary file not shown.
Binary file added testdata/files/sample-rw-bs-v2.car
Binary file not shown.
Binary file added testdata/files/sample-v1.car
Binary file not shown.
Binary file added testdata/files/sample-wrapped-v2.car
Binary file not shown.
66 changes: 66 additions & 0 deletions testdata/testdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package testdata

import (
"bytes"
"embed"
"fmt"

"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
)

const (
FSPathCarV1 = "files/sample-v1.car"
FSPathCarV2 = "files/sample-wrapped-v2.car"
FSPathCarV22 = "files/sample-rw-bs-v2.car"
FSPathJunk = "files/junk.dat"

RootPathCarV1 = "testdata/files/sample-v1.car"
RootPathCarV2 = "testdata/files/sample-wrapped-v2.car"
RootPathJunk = "testdata/files/funk.dat"
)

var (
//go:embed files/*
FS embed.FS

CarV1 []byte
CarV2 []byte
Junk []byte

// RootCID is the root CID of the carv2 for testing.
RootCID cid.Cid
)

func init() {
var err error
CarV1, err = FS.ReadFile(FSPathCarV1)
if err != nil {
panic(err)
}

CarV2, err = FS.ReadFile(FSPathCarV2)
if err != nil {
panic(err)
}

Junk, err = FS.ReadFile(FSPathJunk)
if err != nil {
panic(err)
}

reader, err := car.NewReader(bytes.NewReader(CarV2))
if err != nil {
panic(fmt.Errorf("failed to parse carv2: %w", err))
}
defer reader.Close()

roots, err := reader.Roots()
if err != nil {
panic(fmt.Errorf("failed to obtain carv2 roots: %w", err))
}
if len(roots) == 0 {
panic("carv2 has no roots")
}
RootCID = roots[0]
}

0 comments on commit 076ddd9

Please sign in to comment.