Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
integrate car store
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed May 12, 2022
1 parent 076ddd9 commit 81e51e4
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 78 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ go 1.16

require (
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-ipfs-blockstore v1.2.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car v0.3.3
github.com/ipld/go-car/v2 v2.1.2-0.20220429070120-51b5cbdd49db
github.com/ipld/go-ipld-prime v0.16.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73
github.com/libp2p/go-libp2p v0.19.2
github.com/libp2p/go-libp2p-core v0.15.1
github.com/stretchr/testify v1.7.0
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
github.com/frankban/quicktest v1.14.2 h1:SPb1KFFmM+ybpEjPUhCCkZOM5xlovT5UbrMvWnXyBns=
github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand Down Expand Up @@ -277,6 +278,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
Expand Down Expand Up @@ -357,11 +359,13 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/go-bitswap v0.5.1/go.mod h1:P+ckC87ri1xFLvk74NlXdP0Kj9RmWAh4+H78sC6Qopo=
github.com/ipfs/go-bitswap v0.6.0 h1:f2rc6GZtoSFhEIzQmddgGiel9xntj02Dg0ZNf2hSC+w=
github.com/ipfs/go-bitswap v0.6.0/go.mod h1:Hj3ZXdOC5wBJvENtdqsixmzzRukqd8EHLxZLZc3mzRA=
github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=
github.com/ipfs/go-block-format v0.0.3 h1:r8t66QstRp/pd/or4dpnbVfXT5Gt7lOqRvC+/dDTpMc=
github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk=
github.com/ipfs/go-blockservice v0.2.1/go.mod h1:k6SiwmgyYgs4M/qt+ww6amPeUH9EISLRBnvUurKJhi8=
github.com/ipfs/go-blockservice v0.3.0 h1:cDgcZ+0P0Ih3sl8+qjFr2sVaMdysg/YZpLj5WJ8kiiw=
github.com/ipfs/go-blockservice v0.3.0/go.mod h1:P5ppi8IHDC7O+pA0AlGTF09jruB2h+oP3wVVaZl8sfk=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
Expand Down Expand Up @@ -393,6 +397,7 @@ github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIyk
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q=
github.com/ipfs/go-ipfs-blockstore v0.2.1/go.mod h1:jGesd8EtCM3/zPgx+qr0/feTXGUeRai6adgwC+Q+JvE=
github.com/ipfs/go-ipfs-blockstore v1.1.2/go.mod h1:w51tNR9y5+QXB0wkNcHt4O2aSZjTdqaEWaQdSxEyUOY=
github.com/ipfs/go-ipfs-blockstore v1.2.0 h1:n3WTeJ4LdICWs/0VSfjHrlqpPpl6MZ+ySd3j8qz0ykw=
github.com/ipfs/go-ipfs-blockstore v1.2.0/go.mod h1:eh8eTFLiINYNSNawfZOC7HOxNTxpB1PFuA5E1m/7exE=
Expand All @@ -406,6 +411,7 @@ github.com/ipfs/go-ipfs-ds-help v1.1.0 h1:yLE2w9RAsl31LtfMt91tRZcrx+e61O5mDxFRR9
github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNot+rsOU/5IatU=
github.com/ipfs/go-ipfs-exchange-interface v0.1.0 h1:TiMekCrOGQuWYtZO3mf4YJXDIdNgnKWZ9IE3fGlnWfo=
github.com/ipfs/go-ipfs-exchange-interface v0.1.0/go.mod h1:ych7WPlyHqFvCi/uQI48zLZuAWVP5iTQPXEfVaw5WEI=
github.com/ipfs/go-ipfs-exchange-offline v0.1.1/go.mod h1:vTiBRIbzSwDD0OWm+i3xeT0mO7jG2cbJYatp3HPk5XY=
github.com/ipfs/go-ipfs-exchange-offline v0.2.0 h1:2PF4o4A7W656rC0RxuhUace997FTcDTcIQ6NoEtyjAI=
github.com/ipfs/go-ipfs-exchange-offline v0.2.0/go.mod h1:HjwBeW0dvZvfOMwDP0TSKXIHf2s+ksdP4E3MLDRtLKY=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
Expand Down Expand Up @@ -438,6 +444,7 @@ github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72g
github.com/ipfs/go-log/v2 v2.5.0/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY=
github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
github.com/ipfs/go-merkledag v0.5.1/go.mod h1:cLMZXx8J08idkp5+id62iVftUQV+HlYJ3PIhDfZsjA4=
github.com/ipfs/go-merkledag v0.6.0 h1:oV5WT2321tS4YQVOPgIrWHvJ0lJobRTerU+i9nmUCuA=
github.com/ipfs/go-merkledag v0.6.0/go.mod h1:9HSEwRd5sV+lbykiYP+2NC/3o6MZbKNaa4hfNcH5iH0=
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
Expand All @@ -446,13 +453,16 @@ github.com/ipfs/go-peertaskqueue v0.7.0 h1:VyO6G4sbzX80K58N60cCaHsSsypbUNs1GjO5s
github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipld/go-car v0.3.3 h1:D6y+jvg9h2ZSv7GLUMWUwg5VTLy1E7Ak+uQw5orOg3I=
github.com/ipld/go-car v0.3.3/go.mod h1:/wkKF4908ULT4dFIFIUZYcfjAnj+KFnJvlh8Hsz1FbQ=
github.com/ipld/go-car/v2 v2.1.2-0.20220429070120-51b5cbdd49db h1:wO0PaXdWWjw1ux6kKgbqyLGcgJHbKcreo7m8CO1Wyvo=
github.com/ipld/go-car/v2 v2.1.2-0.20220429070120-51b5cbdd49db/go.mod h1:pcqXhtO94vxsx3vwpKZBqeZTTMot1SIBJunmextsqM0=
github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA=
github.com/ipld/go-codec-dagpb v1.3.1 h1:yVNlWRQexCa54ln3MSIiUN++ItH7pdhBFhh0hSgZu1w=
github.com/ipld/go-codec-dagpb v1.3.1/go.mod h1:ErNNglIi5KMur/MfFE/svtgQthzVvf+43MrzLbpcIZY=
github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
github.com/ipld/go-ipld-prime v0.14.3-0.20211207234443-319145880958/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/ipld/go-ipld-prime v0.16.0 h1:RS5hhjB/mcpeEPJvfyj0qbOj/QL+/j05heZ0qa97dVo=
github.com/ipld/go-ipld-prime v0.16.0/go.mod h1:axSCuOCBPqrH+gvXr2w9uAOulJqBPhHPT2PjoiiU1qA=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73 h1:TsyATB2ZRRQGTwafJdgEUQkmjOExRV0DNokcihZxbnQ=
Expand Down
109 changes: 34 additions & 75 deletions libp2pcarserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,79 +2,67 @@ package libp2pcarserver

import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"time"

"github.com/ipld/go-ipld-prime/codec/dagcbor"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
logging "github.com/ipfs/go-log/v2"

bstore "github.com/ipfs/go-ipfs-blockstore"

"github.com/filecoin-project/saturn-l2/store"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/libp2p/go-libp2p-core/host"

"github.com/ipld/go-car/v2/blockstore"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"

"github.com/ipld/go-car/v2"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
)

const CarTransferProtocol = "/saturn/l2/car/1.0" // car transfer protocol
const CARTransferProtocol = "/saturn/l2/car/1.0" // car transfer protocol
var log = logging.Logger("car-transfer")

var (
maxRequestSize = 1048576 // 1 MiB - max size of the CAR transfer request
readDeadline = 10 * time.Second
writeDeadline = 30 * time.Minute
)

// CarTransferRequest is the request sent by the client to transfer a CAR file
// for the given root and selector
type CarTransferRequest struct {
Root string // base64 encoded byte array
Selector string // base 64 encoded byte array
}

type dagTraversalRequest struct {
root cid.Cid
selector ipld.Node
}

// Libp2pCARServer serves CAR files for a given root and selector over the libp2p CarTransferProtocol.
// Libp2pCARServer serves CAR files for a given root and selector over the libp2p CARTransferProtocol.
type Libp2pCARServer struct {
ctx context.Context
cancel context.CancelFunc
h host.Host

cs *store.CARStore
}

func New(h host.Host) *Libp2pCARServer {
func New(h host.Host, cs *store.CARStore) *Libp2pCARServer {
ctx, cancel := context.WithCancel(context.Background())
return &Libp2pCARServer{
ctx: ctx,
cancel: cancel,
h: h,
cs: cs,
}
}

func (l *Libp2pCARServer) Start() {
l.h.SetStreamHandler(protocol.ID(CarTransferProtocol), l.Serve)
l.h.SetStreamHandler(protocol.ID(CARTransferProtocol), l.serveCARFile)
}

func (l *Libp2pCARServer) Stop() {
l.cancel()
l.h.RemoveStreamHandler(protocol.ID(CarTransferProtocol))
l.h.RemoveStreamHandler(protocol.ID(CARTransferProtocol))
}

func (l *Libp2pCARServer) Serve(s network.Stream) {
func (l *Libp2pCARServer) serveCARFile(s network.Stream) {
defer s.Close()

// Set a deadline on reading from the stream so it does NOT hang
Expand All @@ -85,68 +73,39 @@ func (l *Libp2pCARServer) Serve(s network.Stream) {
if err != nil {
return
}

// read the json car transfer request
var req CarTransferRequest
var req CARTransferRequest
if err := json.Unmarshal(reqBz, &req); err != nil {
return
}
dr, err := carRequestToDAGRequest(&req)
if err != nil {
return
}
log.Debugw("car transfer request", "base64-root", req.Root, "base64-selector", req.Selector)

// Set a deadline on writing to the stream so it doesn't hang
_ = s.SetWriteDeadline(time.Now().Add(writeDeadline))
defer s.SetWriteDeadline(time.Time{}) // nolint

// TODO Map root to CAR files
from, err := blockstore.OpenReadOnly("../testdata/files/sample-v1.car")
if err != nil {
return
if err := l.cs.WithStore(dr.root, func(ro bstore.Blockstore) error {
ls := cidlink.DefaultLinkSystem()
bsa := bsadapter.Adapter{Wrapped: ro}
ls.SetReadStorage(&bsa)

bf := bufio.NewWriter(s)
defer bf.Flush()

_, err = car.TraverseV1(l.ctx, &ls, dr.root, dr.selector, bf)
if err != nil {
return err
}
return nil
}); err != nil {
log.Errorw("car transfer failed", "base64-root", req.Root, "base64-selector", req.Selector, "err", err)
}
ls := cidlink.DefaultLinkSystem()
bsa := bsadapter.Adapter{Wrapped: from}
ls.SetReadStorage(&bsa)

bf := bufio.NewWriter(s)
defer bf.Flush()
log.Debugw("car transfer successful", "base64-root", req.Root, "base64-selector", req.Selector)

_, err = car.TraverseV1(l.ctx, &ls, dr.root, dr.selector, bf)
if err != nil {
return
}
// TODO record sent bytes
}

func carRequestToDAGRequest(req *CarTransferRequest) (*dagTraversalRequest, error) {
rootbz, err := base64.StdEncoding.DecodeString(req.Root)
if err != nil {
return nil, fmt.Errorf("failed to decode root: %s", err)
}
rootcid, err := cid.Cast(rootbz)
if err != nil {
return nil, fmt.Errorf("failed to cast root to cid: %s", err)
}

selbz, err := base64.StdEncoding.DecodeString(req.Selector)
if err != nil {
return nil, fmt.Errorf("failed to decode selector: %s", err)
}
sel, err := decodeSelector(selbz)
if err != nil {
return nil, fmt.Errorf("failed to decode selector to ipld node: %s", err)
}

return &dagTraversalRequest{
root: rootcid,
selector: sel,
}, nil
}

func decodeSelector(sel []byte) (ipld.Node, error) {
nb := basicnode.Prototype.Any.NewBuilder()
if err := dagcbor.Decode(nb, bytes.NewReader(sel)); err != nil {
return nil, err
}
return nb.Build(), nil
}
30 changes: 27 additions & 3 deletions libp2pcarserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ import (
"encoding/base64"
"encoding/json"
"io/ioutil"
"os"
"testing"
"time"

bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipld/go-car"

"github.com/filecoin-project/saturn-l2/store"

"github.com/ipld/go-ipld-prime/codec/dagcbor"

"github.com/ipld/go-car/v2/blockstore"
Expand All @@ -28,27 +34,45 @@ func TestSimpleTransfer(t *testing.T) {
require.NoError(t, err)
p2, err := mn.GenPeer()
require.NoError(t, err)
l := New(p2)
cs, err := store.NewCARStore(t.TempDir())
require.NoError(t, err)
l := New(p2, cs)
l.Start()

require.NoError(t, mn.LinkAll())

p1.Peerstore().AddAddrs(p2.ID(), p2.Addrs(), 1*time.Hour)
require.NoError(t, p1.Connect(ctx, peer.AddrInfo{ID: p2.ID()}))

s, err := p1.NewStream(ctx, p2.ID(), CarTransferProtocol)
s, err := p1.NewStream(ctx, p2.ID(), CARTransferProtocol)
require.NoError(t, err)

from, err := blockstore.OpenReadOnly("../testdata/files/sample-v1.car")
require.NoError(t, err)
rts, err := from.Roots()
require.NoError(t, err)
require.NoError(t, from.Close())
// add the car file to the car store
require.NoError(t, cs.Create(rts[0], rts[0], func(bs bstore.Blockstore) error {
f, err := os.Open("../testdata/files/sample-v1.car")
if err != nil {
return err
}
if _, err := car.LoadCar(ctx, bs, f); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
return nil
}))

rtbz := rts[0].Bytes()

bf := bytes.Buffer{}
require.NoError(t, dagcbor.Encode(selectorparse.CommonSelector_ExploreAllRecursively, &bf))

req := CarTransferRequest{
req := CARTransferRequest{
Root: base64.StdEncoding.EncodeToString(rtbz),
Selector: base64.StdEncoding.EncodeToString(bf.Bytes()),
}
Expand Down
57 changes: 57 additions & 0 deletions libp2pcarserver/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package libp2pcarserver

import (
"bytes"
"encoding/base64"
"fmt"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
)

// CARTransferRequest is the request sent by the client to transfer a CAR file
// for the given root and selector
type CARTransferRequest struct {
Root string // base64 encoded byte array
Selector string // base 64 encoded byte array
}

type dagTraversalRequest struct {
root cid.Cid
selector ipld.Node
}

func carRequestToDAGRequest(req *CARTransferRequest) (*dagTraversalRequest, error) {
rootbz, err := base64.StdEncoding.DecodeString(req.Root)
if err != nil {
return nil, fmt.Errorf("failed to decode root: %s", err)
}
rootcid, err := cid.Cast(rootbz)
if err != nil {
return nil, fmt.Errorf("failed to cast root to cid: %s", err)
}

selbz, err := base64.StdEncoding.DecodeString(req.Selector)
if err != nil {
return nil, fmt.Errorf("failed to decode selector: %s", err)
}
sel, err := decodeSelector(selbz)
if err != nil {
return nil, fmt.Errorf("failed to decode selector to ipld node: %s", err)
}

return &dagTraversalRequest{
root: rootcid,
selector: sel,
}, nil
}

func decodeSelector(sel []byte) (ipld.Node, error) {
nb := basicnode.Prototype.Any.NewBuilder()
if err := dagcbor.Decode(nb, bytes.NewReader(sel)); err != nil {
return nil, err
}
return nb.Build(), nil
}
Loading

0 comments on commit 81e51e4

Please sign in to comment.