Skip to content

Commit

Permalink
[WIP] Extract DAGService and friends
Browse files Browse the repository at this point in the history
**DO NOT MERGE!**

Refactor IPFS to use: ipfs/go-ipld-format#8

Context: IPLD isn't currently very usable from libp2p because interfaces like
the DAGService are bundled into go-ipfs. This PR extracts these interfaces and
some of the related helper functions.

License: MIT
Signed-off-by: Steven Allen <[email protected]>
  • Loading branch information
Stebalien committed Jul 12, 2017
1 parent 4316c97 commit 0ffa68e
Show file tree
Hide file tree
Showing 46 changed files with 183 additions and 357 deletions.
4 changes: 4 additions & 0 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ type Session struct {
ses exchange.Fetcher
}

func (s *Session) Blockstore() blockstore.Blockstore {
return s.bs
}

// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
return getBlock(ctx, c, s.bs, s.ses)
Expand Down
3 changes: 2 additions & 1 deletion core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58"
ipdht "gx/ipfs/QmTHyAbD9KzGrseLNzmEoNkVxA8F2h7LQG2iV6uhBqs6kX/go-libp2p-kad-dht"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
node "gx/ipfs/QmVHxZ8ovAuHiHTbJa68budGYAqmMUzb1bqDW1SVb6y5M9/go-ipld-format"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
)

Expand Down Expand Up @@ -374,7 +375,7 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er
return nil
}

func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGService, cids []*cid.Cid) error {
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv node.DAGService, cids []*cid.Cid) error {
provided := cid.NewSet()
for _, c := range cids {
kset := cid.NewSet()
Expand Down
2 changes: 1 addition & 1 deletion core/commands/files/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func statGetFormatOptions(req cmds.Request) (string, error) {
}
}

func statNode(ds dag.DAGService, fsn mfs.FSNode) (*Object, error) {
func statNode(ds node.DAGService, fsn mfs.FSNode) (*Object, error) {
nd, err := fsn.GetNode()
if err != nil {
return nil, err
Expand Down
7 changes: 4 additions & 3 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
context "context"
u "gx/ipfs/QmSU6eubNdhXjFBJBSksTp8kv8YRub8mGAPv8tVJHmL2EU/go-ipfs-util"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
node "gx/ipfs/QmVHxZ8ovAuHiHTbJa68budGYAqmMUzb1bqDW1SVb6y5M9/go-ipld-format"
)

var PinCmd = &cmds.Command{
Expand Down Expand Up @@ -548,7 +549,7 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(n.Context(), n.DAG.GetLinks, k, set.Visit)
err := dag.EnumerateChildren(n.Context(), dag.GetLinksWithDAG(n.DAG), k, set.Visit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -587,7 +588,7 @@ type pinVerifyOpts struct {

func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts) <-chan interface{} {
visited := make(map[string]PinStatus)
getLinks := n.DAG.GetOfflineLinkService().GetLinks
ng := n.DAG.OfflineNodeGetter()
recPins := n.Pinning.RecursiveKeys()

var checkPin func(root *cid.Cid) PinStatus
Expand All @@ -597,7 +598,7 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts) <-chan
return status
}

links, err := getLinks(ctx, root)
links, err := node.GetLinks(ctx, ng, root)
if err != nil {
status := PinStatus{Ok: false}
if opts.explain {
Expand Down
5 changes: 2 additions & 3 deletions core/commands/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core"
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"

u "gx/ipfs/QmSU6eubNdhXjFBJBSksTp8kv8YRub8mGAPv8tVJHmL2EU/go-ipfs-util"
Expand Down Expand Up @@ -220,7 +219,7 @@ type RefWrapper struct {

type RefWriter struct {
out chan interface{}
DAG dag.DAGService
DAG node.DAGService
Ctx context.Context

Unique bool
Expand All @@ -242,7 +241,7 @@ func (rw *RefWriter) writeRefsRecursive(n node.Node) (int, error) {
nc := n.Cid()

var count int
for i, ng := range dag.GetDAG(rw.Ctx, rw.DAG, n) {
for i, ng := range node.GetDAG(rw.Ctx, rw.DAG, n) {
lc := n.Links()[i].Cid
if rw.skip(lc) {
continue
Expand Down
3 changes: 2 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58"
dht "gx/ipfs/QmTHyAbD9KzGrseLNzmEoNkVxA8F2h7LQG2iV6uhBqs6kX/go-libp2p-kad-dht"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
node "gx/ipfs/QmVHxZ8ovAuHiHTbJa68budGYAqmMUzb1bqDW1SVb6y5M9/go-ipld-format"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
metrics "gx/ipfs/QmVjRAPfRtResCMCE4eBqr4Beoa6A89P1YweG9wUS6RqUL/go-libp2p-metrics"
ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr"
Expand Down Expand Up @@ -114,7 +115,7 @@ type IpfsNode struct {
BaseBlocks bstore.Blockstore // the raw blockstore, no filestore wrapping
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
DAG node.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Discovery discovery.Service
Expand Down
6 changes: 3 additions & 3 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type AddedObject struct {
Bytes int64 `json:",omitempty"`
}

func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds dag.DAGService) (*Adder, error) {
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds node.DAGService) (*Adder, error) {
return &Adder{
ctx: ctx,
pinning: p,
Expand All @@ -90,7 +90,7 @@ type Adder struct {
ctx context.Context
pinning pin.Pinner
blockstore bstore.GCBlockstore
dagService dag.DAGService
dagService node.DAGService
Out chan interface{}
Progress bool
Hidden bool
Expand Down Expand Up @@ -548,7 +548,7 @@ func outputDagnode(out chan interface{}, name string, dn node.Node) error {
return nil
}

func NewMemoryDagService() dag.DAGService {
func NewMemoryDagService() node.DAGService {
// build mem-datastore for editor's intermediary nodes
bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
bsrv := bserv.New(bs, offline.Exchange(bs))
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestAddGCLive(t *testing.T) {
defer cancel()

set := cid.NewSet()
err = dag.EnumerateChildren(ctx, node.DAG.GetLinks, last, set.Visit)
err = dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(node.DAG), last, set.Visit)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion core/coreunix/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
context "context"
u "gx/ipfs/QmSU6eubNdhXjFBJBSksTp8kv8YRub8mGAPv8tVJHmL2EU/go-ipfs-util"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
node "gx/ipfs/QmVHxZ8ovAuHiHTbJa68budGYAqmMUzb1bqDW1SVb6y5M9/go-ipld-format"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
dssync "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/sync"
)

func getDagserv(t *testing.T) merkledag.DAGService {
func getDagserv(t *testing.T) node.DAGService {
db := dssync.MutexWrap(ds.NewMapDatastore())
bs := bstore.NewBlockstore(db)
blockserv := bserv.New(bs, offline.Exchange(bs))
Expand Down
5 changes: 3 additions & 2 deletions importer/balanced/balanced_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (

"context"
u "gx/ipfs/QmSU6eubNdhXjFBJBSksTp8kv8YRub8mGAPv8tVJHmL2EU/go-ipfs-util"
node "gx/ipfs/QmVHxZ8ovAuHiHTbJa68budGYAqmMUzb1bqDW1SVb6y5M9/go-ipld-format"
)

// TODO: extract these tests and more as a generic layout test suite

func buildTestDag(ds dag.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error) {
func buildTestDag(ds node.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error) {
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
Expand All @@ -34,7 +35,7 @@ func buildTestDag(ds dag.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error)
return nd.(*dag.ProtoNode), nil
}

func getTestDag(t *testing.T, ds dag.DAGService, size int64, blksize int64) (*dag.ProtoNode, []byte) {
func getTestDag(t *testing.T, ds node.DAGService, size int64, blksize int64) (*dag.ProtoNode, []byte) {
data := make([]byte, size)
u.NewTimeSeededRand().Read(data)
r := bytes.NewReader(data)
Expand Down
10 changes: 5 additions & 5 deletions importer/helpers/dagbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
dserv dag.DAGService
dserv node.DAGService
spl chunk.Splitter
recvdErr error
rawLeaves bool
nextData []byte // the next item to return.
maxlinks int
batch *dag.Batch
batch *node.Batch
fullPath string
stat os.FileInfo
prefix *cid.Prefix
Expand All @@ -40,7 +40,7 @@ type DagBuilderParams struct {
Prefix *cid.Prefix

// DAGService to write blocks to (required)
Dagserv dag.DAGService
Dagserv node.DAGService

// NoCopy signals to the chunker that it should track fileinfo for
// filestore adds
Expand All @@ -56,7 +56,7 @@ func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
rawLeaves: dbp.RawLeaves,
prefix: dbp.Prefix,
maxlinks: dbp.Maxlinks,
batch: dbp.Dagserv.Batch(),
batch: node.Batching(dbp.Dagserv),
}
if fi, ok := spl.Reader().(files.FileInfo); dbp.NoCopy && ok {
db.fullPath = fi.AbsPath()
Expand Down Expand Up @@ -106,7 +106,7 @@ func (db *DagBuilderHelper) Next() ([]byte, error) {
}

// GetDagServ returns the dagservice object this Helper is using
func (db *DagBuilderHelper) GetDagServ() dag.DAGService {
func (db *DagBuilderHelper) GetDagServ() node.DAGService {
return db.dserv
}

Expand Down
2 changes: 1 addition & 1 deletion importer/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (n *UnixfsNode) Set(other *UnixfsNode) {
}
}

func (n *UnixfsNode) GetChild(ctx context.Context, i int, ds dag.DAGService) (*UnixfsNode, error) {
func (n *UnixfsNode) GetChild(ctx context.Context, i int, ds node.NodeGetter) (*UnixfsNode, error) {
nd, err := n.node.Links()[i].GetNode(ctx, ds)
if err != nil {
return nil, err
Expand Down
7 changes: 3 additions & 4 deletions importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ import (
"github.com/ipfs/go-ipfs/importer/chunk"
h "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle"
dag "github.com/ipfs/go-ipfs/merkledag"

node "gx/ipfs/QmVHxZ8ovAuHiHTbJa68budGYAqmMUzb1bqDW1SVb6y5M9/go-ipld-format"
)

// Builds a DAG from the given file, writing created blocks to disk as they are
// created
func BuildDagFromFile(fpath string, ds dag.DAGService) (node.Node, error) {
func BuildDagFromFile(fpath string, ds node.DAGService) (node.Node, error) {
stat, err := os.Lstat(fpath)
if err != nil {
return nil, err
Expand All @@ -37,7 +36,7 @@ func BuildDagFromFile(fpath string, ds dag.DAGService) (node.Node, error) {
return BuildDagFromReader(ds, chunk.NewSizeSplitter(f, chunk.DefaultBlockSize))
}

func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter) (node.Node, error) {
func BuildDagFromReader(ds node.DAGService, spl chunk.Splitter) (node.Node, error) {
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
Expand All @@ -46,7 +45,7 @@ func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter) (node.Node, error
return bal.BalancedLayout(dbp.New(spl))
}

func BuildTrickleDagFromReader(ds dag.DAGService, spl chunk.Splitter) (node.Node, error) {
func BuildTrickleDagFromReader(ds node.DAGService, spl chunk.Splitter) (node.Node, error) {
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
Expand Down
7 changes: 3 additions & 4 deletions importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (
"testing"

chunk "github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
uio "github.com/ipfs/go-ipfs/unixfs/io"

u "gx/ipfs/QmSU6eubNdhXjFBJBSksTp8kv8YRub8mGAPv8tVJHmL2EU/go-ipfs-util"
node "gx/ipfs/QmVHxZ8ovAuHiHTbJa68budGYAqmMUzb1bqDW1SVb6y5M9/go-ipld-format"
)

func getBalancedDag(t testing.TB, size int64, blksize int64) (node.Node, dag.DAGService) {
func getBalancedDag(t testing.TB, size int64, blksize int64) (node.Node, node.DAGService) {
ds := mdtest.Mock()
r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildDagFromReader(ds, chunk.NewSizeSplitter(r, blksize))
Expand All @@ -26,7 +25,7 @@ func getBalancedDag(t testing.TB, size int64, blksize int64) (node.Node, dag.DAG
return nd, ds
}

func getTrickleDag(t testing.TB, size int64, blksize int64) (node.Node, dag.DAGService) {
func getTrickleDag(t testing.TB, size int64, blksize int64) (node.Node, node.DAGService) {
ds := mdtest.Mock()
r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildTrickleDagFromReader(ds, chunk.NewSizeSplitter(r, blksize))
Expand Down Expand Up @@ -102,7 +101,7 @@ func BenchmarkTrickleReadFull(b *testing.B) {
runReadBench(b, nd, ds)
}

func runReadBench(b *testing.B, nd node.Node, ds dag.DAGService) {
func runReadBench(b *testing.B, nd node.Node, ds node.DAGService) {
for i := 0; i < b.N; i++ {
ctx, cancel := context.WithCancel(context.Background())
read, err := uio.NewDagReader(ctx, nd, ds)
Expand Down
3 changes: 2 additions & 1 deletion importer/trickle/trickle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
uio "github.com/ipfs/go-ipfs/unixfs/io"

u "gx/ipfs/QmSU6eubNdhXjFBJBSksTp8kv8YRub8mGAPv8tVJHmL2EU/go-ipfs-util"
node "gx/ipfs/QmVHxZ8ovAuHiHTbJa68budGYAqmMUzb1bqDW1SVb6y5M9/go-ipld-format"
)

func buildTestDag(ds merkledag.DAGService, spl chunk.Splitter) (*merkledag.ProtoNode, error) {
func buildTestDag(ds node.DAGService, spl chunk.Splitter) (*merkledag.ProtoNode, error) {
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
Expand Down
4 changes: 2 additions & 2 deletions importer/trickle/trickledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func trickleDepthInfo(node *h.UnixfsNode, maxlinks int) (int, int) {

// VerifyTrickleDagStructure checks that the given dag matches exactly the trickle dag datastructure
// layout
func VerifyTrickleDagStructure(nd node.Node, ds dag.DAGService, direct int, layerRepeat int) error {
func VerifyTrickleDagStructure(nd node.Node, ds node.NodeGetter, direct int, layerRepeat int) error {
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
return dag.ErrNotProtobuf
Expand All @@ -246,7 +246,7 @@ func VerifyTrickleDagStructure(nd node.Node, ds dag.DAGService, direct int, laye
}

// Recursive call for verifying the structure of a trickledag
func verifyTDagRec(nd *dag.ProtoNode, depth, direct, layerRepeat int, ds dag.DAGService) error {
func verifyTDagRec(nd *dag.ProtoNode, depth, direct, layerRepeat int, ds node.NodeGetter) error {
if depth == 0 {
// zero depth dag is raw data block
if len(nd.Links()) > 0 {
Expand Down
Loading

0 comments on commit 0ffa68e

Please sign in to comment.