diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index fc90e969f03..6d3d920d2fc 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -20,6 +20,7 @@ import ( coreapi "github.com/ipfs/go-ipfs/core/coreapi" corehttp "github.com/ipfs/go-ipfs/core/corehttp" corerepo "github.com/ipfs/go-ipfs/core/corerepo" + "github.com/ipfs/go-ipfs/core/node" nodeMount "github.com/ipfs/go-ipfs/fuse/node" fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" migrate "github.com/ipfs/go-ipfs/repo/fsrepo/migrations" @@ -323,11 +324,11 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment case routingOptionSupernodeKwd: return errors.New("supernode routing was never fully implemented and has been removed") case routingOptionDHTClientKwd: - ncfg.Routing = core.DHTClientOption + ncfg.Routing = node.DHTClientOption case routingOptionDHTKwd: - ncfg.Routing = core.DHTOption + ncfg.Routing = node.DHTOption case routingOptionNoneKwd: - ncfg.Routing = core.NilRouterOption + ncfg.Routing = node.NilRouterOption default: return fmt.Errorf("unrecognized routing option: %s", routingOption) } diff --git a/core/bootstrap.go b/core/bootstrap/bootstrap.go similarity index 77% rename from core/bootstrap.go rename to core/bootstrap/bootstrap.go index 5ff4f8e14db..e6b4f826dad 100644 --- a/core/bootstrap.go +++ b/core/bootstrap/bootstrap.go @@ -1,4 +1,4 @@ -package core +package bootstrap import ( "context" @@ -9,19 +9,23 @@ import ( "sync" "time" - math2 "github.com/ipfs/go-ipfs/thirdparty/math2" - lgbl "github.com/libp2p/go-libp2p-loggables" - config "github.com/ipfs/go-ipfs-config" - goprocess "github.com/jbenet/goprocess" - procctx "github.com/jbenet/goprocess/context" - periodicproc "github.com/jbenet/goprocess/periodic" - host "github.com/libp2p/go-libp2p-host" - inet "github.com/libp2p/go-libp2p-net" - peer "github.com/libp2p/go-libp2p-peer" - pstore "github.com/libp2p/go-libp2p-peerstore" + logging "github.com/ipfs/go-log" + "github.com/jbenet/goprocess" + "github.com/jbenet/goprocess/context" + "github.com/jbenet/goprocess/periodic" + "github.com/libp2p/go-libp2p-host" + "github.com/libp2p/go-libp2p-loggables" + "github.com/libp2p/go-libp2p-net" + "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-peerstore" + "github.com/libp2p/go-libp2p-routing" + + "github.com/ipfs/go-ipfs/thirdparty/math2" ) +var log = logging.Logger("bootstrap") + // ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap // peers to bootstrap correctly. var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap") @@ -29,7 +33,6 @@ var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to boots // BootstrapConfig specifies parameters used in an IpfsNode's network // bootstrapping process. type BootstrapConfig struct { - // MinPeerThreshold governs whether to bootstrap more connections. If the // node has less open connections than this number, it will open connections // to the bootstrap nodes. From there, the routing system should be able @@ -50,7 +53,7 @@ type BootstrapConfig struct { // BootstrapPeers is a function that returns a set of bootstrap peers // for the bootstrap process to use. This makes it possible for clients // to control the peers the process uses at any moment. - BootstrapPeers func() []pstore.PeerInfo + BootstrapPeers func() []peerstore.PeerInfo } // DefaultBootstrapConfig specifies default sane parameters for bootstrapping. @@ -60,9 +63,9 @@ var DefaultBootstrapConfig = BootstrapConfig{ ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3 } -func BootstrapConfigWithPeers(pis []pstore.PeerInfo) BootstrapConfig { +func BootstrapConfigWithPeers(pis []peerstore.PeerInfo) BootstrapConfig { cfg := DefaultBootstrapConfig - cfg.BootstrapPeers = func() []pstore.PeerInfo { + cfg.BootstrapPeers = func() []peerstore.PeerInfo { return pis } return cfg @@ -72,7 +75,7 @@ func BootstrapConfigWithPeers(pis []pstore.PeerInfo) BootstrapConfig { // check the number of open connections and -- if there are too few -- initiate // connections to well-known bootstrap peers. It also kicks off subsystem // bootstrapping (i.e. routing). -func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { +func Bootstrap(id peer.ID, host host.Host, rt routing.IpfsRouting, cfg BootstrapConfig) (io.Closer, error) { // make a signal to wait for one bootstrap round to complete. doneWithRound := make(chan struct{}) @@ -85,12 +88,12 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { // the periodic bootstrap function -- the connection supervisor periodic := func(worker goprocess.Process) { - ctx := procctx.OnClosingContext(worker) - defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done() + ctx := goprocessctx.OnClosingContext(worker) + defer log.EventBegin(ctx, "periodicBootstrap", id).Done() - if err := bootstrapRound(ctx, n.PeerHost, cfg); err != nil { - log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err)) - log.Debugf("%s bootstrap error: %s", n.Identity, err) + if err := bootstrapRound(ctx, host, cfg); err != nil { + log.Event(ctx, "bootstrapError", id, loggables.Error(err)) + log.Debugf("%s bootstrap error: %s", id, err) } <-doneWithRound @@ -101,9 +104,9 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { proc.Go(periodic) // run one right now. // kick off Routing.Bootstrap - if n.Routing != nil { - ctx := procctx.OnClosingContext(proc) - if err := n.Routing.Bootstrap(ctx); err != nil { + if rt != nil { + ctx := goprocessctx.OnClosingContext(proc) + if err := rt.Bootstrap(ctx); err != nil { proc.Close() return nil, err } @@ -134,9 +137,9 @@ func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) er numToDial := cfg.MinPeerThreshold - len(connected) // filter out bootstrap nodes we are already connected to - var notConnected []pstore.PeerInfo + var notConnected []peerstore.PeerInfo for _, p := range peers { - if host.Network().Connectedness(p.ID) != inet.Connected { + if host.Network().Connectedness(p.ID) != net.Connected { notConnected = append(notConnected, p) } } @@ -155,7 +158,7 @@ func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) er return bootstrapConnect(ctx, host, randSubset) } -func bootstrapConnect(ctx context.Context, ph host.Host, peers []pstore.PeerInfo) error { +func bootstrapConnect(ctx context.Context, ph host.Host, peers []peerstore.PeerInfo) error { if len(peers) < 1 { return ErrNotEnoughBootstrapPeers } @@ -170,12 +173,12 @@ func bootstrapConnect(ctx context.Context, ph host.Host, peers []pstore.PeerInfo // Also, performed asynchronously for dial speed. wg.Add(1) - go func(p pstore.PeerInfo) { + go func(p peerstore.PeerInfo) { defer wg.Done() defer log.EventBegin(ctx, "bootstrapDial", ph.ID(), p.ID).Done() log.Debugf("%s bootstrapping to %s", ph.ID(), p.ID) - ph.Peerstore().AddAddrs(p.ID, p.Addrs, pstore.PermanentAddrTTL) + ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) if err := ph.Connect(ctx, p); err != nil { log.Event(ctx, "bootstrapDialFailed", p.ID) log.Debugf("failed to bootstrap with %v: %s", p.ID, err) @@ -204,12 +207,26 @@ func bootstrapConnect(ctx context.Context, ph host.Host, peers []pstore.PeerInfo return nil } -func toPeerInfos(bpeers []config.BootstrapPeer) []pstore.PeerInfo { - pinfos := make(map[peer.ID]*pstore.PeerInfo) +func randomSubsetOfPeers(in []peerstore.PeerInfo, max int) []peerstore.PeerInfo { + n := math2.IntMin(max, len(in)) + var out []peerstore.PeerInfo + for _, val := range rand.Perm(len(in)) { + out = append(out, in[val]) + if len(out) >= n { + break + } + } + return out +} + +type Peers []config.BootstrapPeer + +func (bpeers Peers) ToPeerInfos() []peerstore.PeerInfo { + pinfos := make(map[peer.ID]*peerstore.PeerInfo) for _, bootstrap := range bpeers { pinfo, ok := pinfos[bootstrap.ID()] if !ok { - pinfo = new(pstore.PeerInfo) + pinfo = new(peerstore.PeerInfo) pinfos[bootstrap.ID()] = pinfo pinfo.ID = bootstrap.ID() } @@ -217,22 +234,10 @@ func toPeerInfos(bpeers []config.BootstrapPeer) []pstore.PeerInfo { pinfo.Addrs = append(pinfo.Addrs, bootstrap.Transport()) } - var peers []pstore.PeerInfo + var peers []peerstore.PeerInfo for _, pinfo := range pinfos { peers = append(peers, *pinfo) } return peers } - -func randomSubsetOfPeers(in []pstore.PeerInfo, max int) []pstore.PeerInfo { - n := math2.IntMin(max, len(in)) - var out []pstore.PeerInfo - for _, val := range rand.Perm(len(in)) { - out = append(out, in[val]) - if len(out) >= n { - break - } - } - return out -} diff --git a/core/bootstrap_test.go b/core/bootstrap/bootstrap_test.go similarity index 95% rename from core/bootstrap_test.go rename to core/bootstrap/bootstrap_test.go index 51e85d8aac2..0c779985847 100644 --- a/core/bootstrap_test.go +++ b/core/bootstrap/bootstrap_test.go @@ -1,4 +1,4 @@ -package core +package bootstrap import ( "fmt" @@ -49,7 +49,7 @@ func TestMultipleAddrsPerPeer(t *testing.T) { bsps = append(bsps, bsp1, bsp2) } - pinfos := toPeerInfos(bsps) + pinfos := Peers.ToPeerInfos(bsps) if len(pinfos) != len(bsps)/2 { t.Fatal("expected fewer peers") } diff --git a/core/builder.go b/core/builder.go index 4a4c2423b80..02b772e705b 100644 --- a/core/builder.go +++ b/core/builder.go @@ -5,57 +5,24 @@ import ( "crypto/rand" "encoding/base64" "errors" - "os" - "syscall" "go.uber.org/fx" - "github.com/ipfs/go-ipfs/p2p" - "github.com/ipfs/go-ipfs/provider" + "github.com/ipfs/go-ipfs/core/bootstrap" + "github.com/ipfs/go-ipfs/core/node" repo "github.com/ipfs/go-ipfs/repo" ds "github.com/ipfs/go-datastore" dsync "github.com/ipfs/go-datastore/sync" cfg "github.com/ipfs/go-ipfs-config" - offline "github.com/ipfs/go-ipfs-exchange-offline" - offroute "github.com/ipfs/go-ipfs-routing/offline" metrics "github.com/ipfs/go-metrics-interface" resolver "github.com/ipfs/go-path/resolver" ci "github.com/libp2p/go-libp2p-crypto" peer "github.com/libp2p/go-libp2p-peer" ) -type BuildCfg struct { - // If online is set, the node will have networking enabled - Online bool - - // ExtraOpts is a map of extra options used to configure the ipfs nodes creation - ExtraOpts map[string]bool - - // If permanent then node should run more expensive processes - // that will improve performance in long run - Permanent bool - - // DisableEncryptedConnections disables connection encryption *entirely*. - // DO NOT SET THIS UNLESS YOU'RE TESTING. - DisableEncryptedConnections bool - - // If NilRepo is set, a Repo backed by a nil datastore will be constructed - NilRepo bool - - Routing RoutingOption - Host HostOption - Repo repo.Repo -} - -func (cfg *BuildCfg) getOpt(key string) bool { - if cfg.ExtraOpts == nil { - return false - } - - return cfg.ExtraOpts[key] -} +type BuildCfg node.BuildCfg func (cfg *BuildCfg) fillDefaults() error { if cfg.Repo != nil && cfg.NilRepo { @@ -77,11 +44,11 @@ func (cfg *BuildCfg) fillDefaults() error { } if cfg.Routing == nil { - cfg.Routing = DHTOption + cfg.Routing = node.DHTOption } if cfg.Host == nil { - cfg.Host = DefaultHostOption + cfg.Host = node.DefaultHostOption } return nil @@ -115,8 +82,6 @@ func defaultRepo(dstore repo.Datastore) (repo.Repo, error) { }, nil } -type MetricsCtx context.Context - // NewNode constructs and returns an IpfsNode using the given cfg. func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { if cfg == nil { @@ -141,12 +106,12 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { }) // TODO: Remove this, use only for passing node config - cfgOption := fx.Provide(func() *BuildCfg { - return cfg + cfgOption := fx.Provide(func() *node.BuildCfg { + return (*node.BuildCfg)(cfg) }) - metricsCtx := fx.Provide(func() MetricsCtx { - return MetricsCtx(ctx) + metricsCtx := fx.Provide(func() node.MetricsCtx { + return node.MetricsCtx(ctx) }) params := fx.Options( @@ -155,58 +120,12 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { metricsCtx, ) - storage := fx.Options( - fx.Provide(repoConfig), - fx.Provide(datastoreCtor), - fx.Provide(baseBlockstoreCtor), - fx.Provide(gcBlockstoreCtor), - ) - - ident := fx.Options( - fx.Provide(identity), - fx.Provide(privateKey), - fx.Provide(peerstore), - ) - - ipns := fx.Options( - fx.Provide(recordValidator), - ) - - providers := fx.Options( - fx.Provide(providerQueue), - fx.Provide(providerCtor), - fx.Provide(reproviderCtor), - - fx.Invoke(reprovider), - fx.Invoke(provider.Provider.Run), - ) - - online := fx.Options( - fx.Provide(onlineExchangeCtor), - fx.Provide(onlineNamesysCtor), - - fx.Invoke(ipnsRepublisher), - - fx.Provide(p2p.NewP2P), - - ipfsp2p, - providers, - ) - if !cfg.Online { - online = fx.Options( - fx.Provide(offline.Exchange), - fx.Provide(offlineNamesysCtor), - fx.Provide(offroute.NewOfflineRouter), - fx.Provide(provider.NewOfflineProvider), - ) - } - core := fx.Options( - fx.Provide(blockServiceCtor), - fx.Provide(dagCtor), + fx.Provide(node.BlockServiceCtor), + fx.Provide(node.DagCtor), fx.Provide(resolver.NewBasicResolver), - fx.Provide(pinning), - fx.Provide(files), + fx.Provide(node.Pinning), + fx.Provide(node.Files), ) n := &IpfsNode{ @@ -214,16 +133,16 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { } app := fx.New( + fx.NopLogger, fx.Provide(baseProcess), params, - storage, - ident, - ipns, - online, + node.Storage, + node.Identity, + node.IPNS, + node.Networked(cfg.Online), fx.Invoke(setupSharding), - fx.NopLogger, core, @@ -248,19 +167,10 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { return nil, err } - // TODO: DI-ify bootstrap + // TODO: How soon will bootstrap move to libp2p? if !cfg.Online { return n, nil } - return n, n.Bootstrap(DefaultBootstrapConfig) -} - -func isTooManyFDError(err error) bool { - perr, ok := err.(*os.PathError) - if ok && perr.Err == syscall.EMFILE { - return true - } - - return false + return n, n.Bootstrap(bootstrap.DefaultBootstrapConfig) } diff --git a/core/core.go b/core/core.go index 3056bd805b9..23aa84e7725 100644 --- a/core/core.go +++ b/core/core.go @@ -11,16 +11,13 @@ package core import ( "context" - "fmt" "io" - "io/ioutil" - "os" - "strings" - "time" "go.uber.org/fx" version "github.com/ipfs/go-ipfs" + "github.com/ipfs/go-ipfs/core/bootstrap" + "github.com/ipfs/go-ipfs/core/node" rp "github.com/ipfs/go-ipfs/exchange/reprovide" "github.com/ipfs/go-ipfs/filestore" "github.com/ipfs/go-ipfs/fuse/mount" @@ -32,27 +29,18 @@ import ( "github.com/ipfs/go-ipfs/repo" bserv "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" bstore "github.com/ipfs/go-ipfs-blockstore" - config "github.com/ipfs/go-ipfs-config" exchange "github.com/ipfs/go-ipfs-exchange-interface" - nilrouting "github.com/ipfs/go-ipfs-routing/none" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" - "github.com/ipfs/go-merkledag" "github.com/ipfs/go-mfs" "github.com/ipfs/go-path/resolver" - ft "github.com/ipfs/go-unixfs" "github.com/jbenet/goprocess" - "github.com/libp2p/go-libp2p" autonat "github.com/libp2p/go-libp2p-autonat-svc" - circuit "github.com/libp2p/go-libp2p-circuit" ic "github.com/libp2p/go-libp2p-crypto" p2phost "github.com/libp2p/go-libp2p-host" ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr" dht "github.com/libp2p/go-libp2p-kad-dht" - dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts" metrics "github.com/libp2p/go-libp2p-metrics" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" @@ -63,18 +51,8 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery" p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/protocol/identify" - mafilter "github.com/libp2p/go-maddr-filter" - smux "github.com/libp2p/go-stream-muxer" - ma "github.com/multiformats/go-multiaddr" - mplex "github.com/whyrusleeping/go-smux-multiplex" - yamux "github.com/whyrusleeping/go-smux-yamux" - mamask "github.com/whyrusleeping/multiaddr-filter" ) -const kReprovideFrequency = time.Hour * 12 -const discoveryConnTimeout = time.Second * 30 -const DefaultIpnsCacheSize = 128 - var log = logging.Logger("core") func init() { @@ -90,16 +68,16 @@ type IpfsNode struct { Repo repo.Repo // Local node - Pinning pin.Pinner // the pinning manager - Mounts Mounts `optional:"true"` // current mount state, if any. - PrivateKey ic.PrivKey // the local node's private Key - PNetFingerprint PNetFingerprint `optional:"true"` // fingerprint of private network + Pinning pin.Pinner // the pinning manager + Mounts Mounts `optional:"true"` // current mount state, if any. + PrivateKey ic.PrivKey // the local node's private Key + PNetFingerprint node.PNetFingerprint `optional:"true"` // fingerprint of private network // Services Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances Blockstore bstore.GCBlockstore // the block store (lower level) Filestore *filestore.Filestore // the filestore blockstore - BaseBlocks BaseBlocks // the raw blockstore, no filestore wrapping + BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc Blocks bserv.BlockService // the block service, get/add blocks. DAG ipld.DAGService // the merkle dag service, get/add objects. @@ -143,94 +121,6 @@ type Mounts struct { Ipns mount.Mount } -func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { - var annAddrs []ma.Multiaddr - for _, addr := range cfg.Announce { - maddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - annAddrs = append(annAddrs, maddr) - } - - filters := mafilter.NewFilters() - noAnnAddrs := map[string]bool{} - for _, addr := range cfg.NoAnnounce { - f, err := mamask.NewMask(addr) - if err == nil { - filters.AddDialFilter(f) - continue - } - maddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - noAnnAddrs[maddr.String()] = true - } - - return func(allAddrs []ma.Multiaddr) []ma.Multiaddr { - var addrs []ma.Multiaddr - if len(annAddrs) > 0 { - addrs = annAddrs - } else { - addrs = allAddrs - } - - var out []ma.Multiaddr - for _, maddr := range addrs { - // check for exact matches - ok := noAnnAddrs[maddr.String()] - // check for /ipcidr matches - if !ok && !filters.AddrBlocked(maddr) { - out = append(out, maddr) - } - } - return out - }, nil -} - -func makeSmuxTransportOption(mplexExp bool) libp2p.Option { - const yamuxID = "/yamux/1.0.0" - const mplexID = "/mplex/6.7.0" - - ymxtpt := &yamux.Transport{ - AcceptBacklog: 512, - ConnectionWriteTimeout: time.Second * 10, - KeepAliveInterval: time.Second * 30, - EnableKeepAlive: true, - MaxStreamWindowSize: uint32(16 * 1024 * 1024), // 16MiB - LogOutput: ioutil.Discard, - } - - if os.Getenv("YAMUX_DEBUG") != "" { - ymxtpt.LogOutput = os.Stderr - } - - muxers := map[string]smux.Transport{yamuxID: ymxtpt} - if mplexExp { - muxers[mplexID] = mplex.DefaultTransport - } - - // Allow muxer preference order overriding - order := []string{yamuxID, mplexID} - if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" { - order = strings.Fields(prefs) - } - - opts := make([]libp2p.Option, 0, len(order)) - for _, id := range order { - tpt, ok := muxers[id] - if !ok { - log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id) - continue - } - delete(muxers, id) - opts = append(opts, libp2p.Muxer(id, tpt)) - } - - return libp2p.ChainOptions(opts...) -} - // Close calls Close() on the App object func (n *IpfsNode) Close() error { return n.app.Stop(n.ctx) @@ -245,7 +135,7 @@ func (n *IpfsNode) Context() context.Context { } // Bootstrap will set and call the IpfsNodes bootstrap function. -func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error { +func (n *IpfsNode) Bootstrap(cfg bootstrap.BootstrapConfig) error { // TODO what should return value be when in offlineMode? if n.Routing == nil { return nil @@ -269,7 +159,7 @@ func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error { } var err error - n.Bootstrapper, err = Bootstrap(n, cfg) + n.Bootstrapper, err = bootstrap.Bootstrap(n.Identity, n.PeerHost, n.Routing, cfg) return err } @@ -283,20 +173,7 @@ func (n *IpfsNode) loadBootstrapPeers() ([]pstore.PeerInfo, error) { if err != nil { return nil, err } - return toPeerInfos(parsed), nil -} - -func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { - var listen []ma.Multiaddr - for _, addr := range cfg.Addresses.Swarm { - maddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm) - } - listen = append(listen, maddr) - } - - return listen, nil + return bootstrap.Peers.ToPeerInfos(parsed), nil } type ConstructPeerHostOpts struct { @@ -306,81 +183,3 @@ type ConstructPeerHostOpts struct { EnableRelayHop bool ConnectionManager ifconnmgr.ConnManager } - -type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) - -var DefaultHostOption HostOption = constructPeerHost - -// isolates the complex initialization steps -func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) { - pkey := ps.PrivKey(id) - if pkey == nil { - return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty()) - } - options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...) - return libp2p.New(ctx, options...) -} - -func filterRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - var raddrs []ma.Multiaddr - for _, addr := range addrs { - _, err := addr.ValueForProtocol(circuit.P_CIRCUIT) - if err == nil { - continue - } - raddrs = append(raddrs, addr) - } - return raddrs -} - -func composeAddrsFactory(f, g p2pbhost.AddrsFactory) p2pbhost.AddrsFactory { - return func(addrs []ma.Multiaddr) []ma.Multiaddr { - return f(g(addrs)) - } -} - -// startListening on the network addresses -func startListening(host p2phost.Host, cfg *config.Config) error { - listenAddrs, err := listenAddresses(cfg) - if err != nil { - return err - } - - // Actually start listening: - if err := host.Network().Listen(listenAddrs...); err != nil { - return err - } - - // list out our addresses - addrs, err := host.Network().InterfaceListenAddresses() - if err != nil { - return err - } - log.Infof("Swarm listening at: %s", addrs) - return nil -} - -func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) { - return dht.New( - ctx, host, - dhtopts.Datastore(dstore), - dhtopts.Validator(validator), - ) -} - -func constructClientDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) { - return dht.New( - ctx, host, - dhtopts.Client(true), - dhtopts.Datastore(dstore), - dhtopts.Validator(validator), - ) -} - -type RoutingOption func(context.Context, p2phost.Host, ds.Batching, record.Validator) (routing.IpfsRouting, error) - -type DiscoveryOption func(context.Context, p2phost.Host) (discovery.Service, error) - -var DHTOption RoutingOption = constructDHTRouting -var DHTClientOption RoutingOption = constructClientDHTRouting -var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index f22803f9216..eaf870ec808 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/node" "github.com/ipfs/go-ipfs/namesys" "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/provider" @@ -207,7 +208,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e cs := cfg.Ipns.ResolveCacheSize if cs == 0 { - cs = core.DefaultIpnsCacheSize + cs = node.DefaultIpnsCacheSize } if cs < 0 { return nil, fmt.Errorf("cannot specify negative resolve cache size") diff --git a/core/coreapi/test/api_test.go b/core/coreapi/test/api_test.go index 9ad164d8c30..23b8b6289d0 100644 --- a/core/coreapi/test/api_test.go +++ b/core/coreapi/test/api_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "testing" + "github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/filestore" "github.com/ipfs/go-ipfs/core" @@ -101,7 +102,7 @@ func (NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, n int) return nil, err } - bsinf := core.BootstrapConfigWithPeers( + bsinf := bootstrap.BootstrapConfigWithPeers( []pstore.PeerInfo{ nodes[0].Peerstore.PeerInfo(nodes[0].Identity), }, diff --git a/core/mock/mock.go b/core/mock/mock.go index dc47917aa44..e759d20104f 100644 --- a/core/mock/mock.go +++ b/core/mock/mock.go @@ -5,6 +5,7 @@ import ( commands "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/node" "github.com/ipfs/go-ipfs/repo" datastore "github.com/ipfs/go-datastore" @@ -29,7 +30,7 @@ func NewMockNode() (*core.IpfsNode, error) { }) } -func MockHostOption(mn mocknet.Mocknet) core.HostOption { +func MockHostOption(mn mocknet.Mocknet) node.HostOption { return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, _ ...libp2p.Option) (host.Host, error) { return mn.AddPeerWithPeerstore(id, ps) } diff --git a/core/ncore.go b/core/ncore.go index bcdc1d0bb24..4704a71af50 100644 --- a/core/ncore.go +++ b/core/ncore.go @@ -1,839 +1,21 @@ package core import ( - "bytes" "context" - "errors" - "fmt" - "github.com/ipfs/go-bitswap" - bsnet "github.com/ipfs/go-bitswap/network" - bserv "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - bstore "github.com/ipfs/go-ipfs-blockstore" - exchange "github.com/ipfs/go-ipfs-exchange-interface" - "github.com/ipfs/go-ipfs-exchange-offline" - u "github.com/ipfs/go-ipfs-util" - rp "github.com/ipfs/go-ipfs/exchange/reprovide" - "github.com/ipfs/go-ipfs/filestore" - "github.com/ipfs/go-ipfs/namesys" - ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" - "github.com/ipfs/go-ipfs/pin" - "github.com/ipfs/go-ipfs/provider" - "github.com/ipfs/go-ipfs/thirdparty/cidv0v1" - "github.com/ipfs/go-ipfs/thirdparty/verifbs" - "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-ipns" - merkledag "github.com/ipfs/go-merkledag" - "github.com/ipfs/go-mfs" - ft "github.com/ipfs/go-unixfs" + "github.com/jbenet/goprocess" - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-autonat-svc" - circuit "github.com/libp2p/go-libp2p-circuit" - connmgr "github.com/libp2p/go-libp2p-connmgr" - "github.com/libp2p/go-libp2p-kad-dht" - "github.com/libp2p/go-libp2p-metrics" - pstore "github.com/libp2p/go-libp2p-peerstore" - "github.com/libp2p/go-libp2p-peerstore/pstoremem" - "github.com/libp2p/go-libp2p-pnet" - "github.com/libp2p/go-libp2p-pubsub" - psrouter "github.com/libp2p/go-libp2p-pubsub-router" - quic "github.com/libp2p/go-libp2p-quic-transport" - "github.com/libp2p/go-libp2p-record" - "github.com/libp2p/go-libp2p-routing" - rhelpers "github.com/libp2p/go-libp2p-routing-helpers" - "github.com/libp2p/go-libp2p/p2p/discovery" - rhost "github.com/libp2p/go-libp2p/p2p/host/routed" "go.uber.org/fx" - "time" - - "github.com/ipfs/go-ipfs/repo" - retry "github.com/ipfs/go-datastore/retrystore" iconfig "github.com/ipfs/go-ipfs-config" uio "github.com/ipfs/go-unixfs/io" - ic "github.com/libp2p/go-libp2p-crypto" - p2phost "github.com/libp2p/go-libp2p-host" - "github.com/libp2p/go-libp2p-peer" - mamask "github.com/whyrusleeping/multiaddr-filter" ) -func repoConfig(repo repo.Repo) (*iconfig.Config, error) { - return repo.Config() -} - -func identity(cfg *iconfig.Config) (peer.ID, error) { - cid := cfg.Identity.PeerID - if cid == "" { - return "", errors.New("identity was not set in config (was 'ipfs init' run?)") - } - if len(cid) == 0 { - return "", errors.New("no peer ID in config! (was 'ipfs init' run?)") - } - - id, err := peer.IDB58Decode(cid) - if err != nil { - return "", fmt.Errorf("peer ID invalid: %s", err) - } - - return id, nil -} - -func peerstore(id peer.ID, sk ic.PrivKey) pstore.Peerstore { - ps := pstoremem.NewPeerstore() - - if sk != nil { - ps.AddPrivKey(id, sk) - ps.AddPubKey(id, sk.GetPublic()) - } - - return ps -} - -func privateKey(cfg *iconfig.Config, id peer.ID) (ic.PrivKey, error) { - if cfg.Identity.PrivKey == "" { - return nil, nil - } - - sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!") - if err != nil { - return nil, err - } - - id2, err := peer.IDFromPrivateKey(sk) - if err != nil { - return nil, err - } - - if id2 != id { - return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2) - } - return sk, nil -} - -func datastoreCtor(repo repo.Repo) ds.Datastore { - return repo.Datastore() -} - -type BaseBlocks bstore.Blockstore - -func baseBlockstoreCtor(mctx MetricsCtx, repo repo.Repo, cfg *iconfig.Config, bcfg *BuildCfg, lc fx.Lifecycle) (bs BaseBlocks, err error) { - rds := &retry.Datastore{ - Batching: repo.Datastore(), - Delay: time.Millisecond * 200, - Retries: 6, - TempErrFunc: isTooManyFDError, - } - // hash security - bs = bstore.NewBlockstore(rds) - bs = &verifbs.VerifBS{Blockstore: bs} - - opts := bstore.DefaultCacheOpts() - opts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize - if !bcfg.Permanent { - opts.HasBloomFilterSize = 0 - } - - if !bcfg.NilRepo { - ctx, cancel := context.WithCancel(mctx) - - lc.Append(fx.Hook{ - OnStop: func(context context.Context) error { - cancel() - return nil - }, - }) - bs, err = bstore.CachedBlockstore(ctx, bs, opts) - if err != nil { - return nil, err - } - } - - bs = bstore.NewIdStore(bs) - bs = cidv0v1.NewBlockstore(bs) - - if cfg.Datastore.HashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly? - bs.HashOnRead(true) - } - - return -} - -func gcBlockstoreCtor(lc fx.Lifecycle, repo repo.Repo, bb BaseBlocks, cfg *iconfig.Config) (gclocker bstore.GCLocker, gcbs bstore.GCBlockstore, bs bstore.Blockstore, fstore *filestore.Filestore) { - gclocker = bstore.NewGCLocker() - gcbs = bstore.NewGCBlockstore(bb, gclocker) - - if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled { - // hash security - fstore = filestore.NewFilestore(bb, repo.FileManager()) //TODO: mark optional - gcbs = bstore.NewGCBlockstore(fstore, gclocker) - gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs} - } - bs = gcbs - return -} - -func blockServiceCtor(lc fx.Lifecycle, bs bstore.Blockstore, rem exchange.Interface) bserv.BlockService { - bsvc := bserv.New(bs, rem) - - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return bsvc.Close() - }, - }) - - return bsvc -} - -func recordValidator(ps pstore.Peerstore) record.Validator { - return record.NamespacedValidator{ - "pk": record.PublicKeyValidator{}, - "ipns": ipns.Validator{KeyBook: ps}, - } -} - -//////////////////// -// libp2p related - //////////////////// // libp2p -var ipfsp2p = fx.Options( - fx.Provide(p2pAddrFilters), - fx.Provide(p2pBandwidthCounter), - fx.Provide(p2pPNet), - fx.Provide(p2pAddrsFactory), - fx.Provide(p2pConnectionManager), - fx.Provide(p2pSmuxTransport), - fx.Provide(p2pNatPortMap), - fx.Provide(p2pRelay), - fx.Provide(p2pAutoRealy), - fx.Provide(p2pDefaultTransports), - fx.Provide(p2pQUIC), - - fx.Provide(p2pHostOption), - fx.Provide(p2pHost), - fx.Provide(p2pOnlineRouting), - - fx.Provide(pubsubCtor), - fx.Provide(newDiscoveryHandler), - - fx.Invoke(autoNATService), - fx.Invoke(p2pPNetChecker), - fx.Invoke(startListening), - fx.Invoke(setupDiscovery), -) - -func p2pHostOption(bcfg *BuildCfg) (hostOption HostOption, err error) { - hostOption = bcfg.Host - if bcfg.DisableEncryptedConnections { - innerHostOption := hostOption - hostOption = func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) { - return innerHostOption(ctx, id, ps, append(options, libp2p.NoSecurity)...) - } - // TODO: shouldn't this be Errorf to guarantee visibility? - log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS. - You will not be able to connect to any nodes configured to use encrypted connections`) - } - return hostOption, nil -} - -func p2pAddrFilters(cfg *iconfig.Config) (opts libp2pOpts, err error) { - for _, s := range cfg.Swarm.AddrFilters { - f, err := mamask.NewMask(s) - if err != nil { - return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s) - } - opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f)) - } - return opts, nil -} - -func p2pBandwidthCounter(cfg *iconfig.Config) (opts libp2pOpts, reporter metrics.Reporter) { - reporter = metrics.NewBandwidthCounter() - - if !cfg.Swarm.DisableBandwidthMetrics { - opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) - } - return opts, reporter -} - -type libp2pOpts struct { - fx.Out - - Opts []libp2p.Option `group:"libp2p"` -} - -type PNetFingerprint []byte // TODO: find some better place -func p2pPNet(repo repo.Repo) (opts libp2pOpts, fp PNetFingerprint, err error) { - swarmkey, err := repo.SwarmKey() - if err != nil || swarmkey == nil { - return opts, nil, err - } - - protec, err := pnet.NewProtector(bytes.NewReader(swarmkey)) - if err != nil { - return opts, nil, fmt.Errorf("failed to configure private network: %s", err) - } - fp = protec.Fingerprint() - - opts.Opts = append(opts.Opts, libp2p.PrivateNetwork(protec)) - return opts, fp, nil -} - -func p2pPNetChecker(repo repo.Repo, ph p2phost.Host, lc fx.Lifecycle) error { - // TODO: better check? - swarmkey, err := repo.SwarmKey() - if err != nil || swarmkey == nil { - return err - } - - done := make(chan struct{}) - lc.Append(fx.Hook{ - OnStart: func(_ context.Context) error { - go func() { - t := time.NewTicker(30 * time.Second) - <-t.C // swallow one tick - for { - select { - case <-t.C: - if len(ph.Network().Peers()) == 0 { - log.Warning("We are in private network and have no peers.") - log.Warning("This might be configuration mistake.") - } - case <-done: - return - } - } - }() - return nil - }, - OnStop: func(_ context.Context) error { - close(done) - return nil - }, - }) - return nil -} - -func p2pAddrsFactory(cfg *iconfig.Config) (opts libp2pOpts, err error) { - addrsFactory, err := makeAddrsFactory(cfg.Addresses) - if err != nil { - return opts, err - } - if !cfg.Swarm.DisableRelay { - addrsFactory = composeAddrsFactory(addrsFactory, filterRelayAddrs) - } - opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory)) - return -} - -func p2pConnectionManager(cfg *iconfig.Config) (opts libp2pOpts, err error) { - grace := iconfig.DefaultConnMgrGracePeriod - low := iconfig.DefaultConnMgrHighWater - high := iconfig.DefaultConnMgrHighWater - - switch cfg.Swarm.ConnMgr.Type { - case "": - // 'default' value is the basic connection manager - return - case "none": - return opts, nil - case "basic": - grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod) - if err != nil { - return opts, fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err) - } - - low = cfg.Swarm.ConnMgr.LowWater - high = cfg.Swarm.ConnMgr.HighWater - default: - return opts, fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type) - } - - cm := connmgr.NewConnManager(low, high, grace) - opts.Opts = append(opts.Opts, libp2p.ConnectionManager(cm)) - return -} - -func p2pSmuxTransport(bcfg *BuildCfg) (opts libp2pOpts, err error) { - opts.Opts = append(opts.Opts, makeSmuxTransportOption(bcfg.getOpt("mplex"))) - return -} - -func p2pNatPortMap(cfg *iconfig.Config) (opts libp2pOpts, err error) { - if !cfg.Swarm.DisableNatPortMap { - opts.Opts = append(opts.Opts, libp2p.NATPortMap()) - } - return -} - -func p2pRelay(cfg *iconfig.Config) (opts libp2pOpts, err error) { - if cfg.Swarm.DisableRelay { - // Enabled by default. - opts.Opts = append(opts.Opts, libp2p.DisableRelay()) - } else { - relayOpts := []circuit.RelayOpt{circuit.OptDiscovery} - if cfg.Swarm.EnableRelayHop { - relayOpts = append(relayOpts, circuit.OptHop) - } - opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...)) - } - return -} - -func p2pAutoRealy(cfg *iconfig.Config) (opts libp2pOpts, err error) { - // enable autorelay - if cfg.Swarm.EnableAutoRelay { - opts.Opts = append(opts.Opts, libp2p.EnableAutoRelay()) - } - return -} - -func p2pDefaultTransports() (opts libp2pOpts, err error) { - opts.Opts = append(opts.Opts, libp2p.DefaultTransports) - return -} - -func p2pQUIC(cfg *iconfig.Config) (opts libp2pOpts, err error) { - if cfg.Experimental.QUIC { - opts.Opts = append(opts.Opts, libp2p.Transport(quic.NewTransport)) - } - return -} - -type p2pHostIn struct { - fx.In - - BCfg *BuildCfg - Repo repo.Repo - Validator record.Validator - HostOption HostOption - ID peer.ID - Peerstore pstore.Peerstore - - Opts [][]libp2p.Option `group:"libp2p"` -} - -type BaseRouting routing.IpfsRouting -type p2pHostOut struct { - fx.Out - - Host p2phost.Host - Routing BaseRouting - IpfsDHT *dht.IpfsDHT -} - -// TODO: move some of this into params struct -func p2pHost(mctx MetricsCtx, lc fx.Lifecycle, params p2pHostIn) (out p2pHostOut, err error) { - opts := []libp2p.Option{libp2p.NoListenAddrs} - for _, o := range params.Opts { - opts = append(opts, o...) - } - - ctx, cancel := context.WithCancel(mctx) - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - cancel() - return nil - }, - }) - - opts = append(opts, libp2p.Routing(func(h p2phost.Host) (routing.PeerRouting, error) { - r, err := params.BCfg.Routing(ctx, h, params.Repo.Datastore(), params.Validator) - out.Routing = r - return r, err - })) - - out.Host, err = params.HostOption(ctx, params.ID, params.Peerstore, opts...) - if err != nil { - return p2pHostOut{}, err - } - - // this code is necessary just for tests: mock network constructions - // ignore the libp2p constructor options that actually construct the routing! - if out.Routing == nil { - r, err := params.BCfg.Routing(ctx, out.Host, params.Repo.Datastore(), params.Validator) - if err != nil { - return p2pHostOut{}, err - } - out.Routing = r - out.Host = rhost.Wrap(out.Host, out.Routing) - } - - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return out.Host.Close() - }, - }) - - // TODO: break this up into more DI units - // TODO: I'm not a fan of type assertions like this but the - // `RoutingOption` system doesn't currently provide access to the - // IpfsNode. - // - // Ideally, we'd do something like: - // - // 1. Add some fancy method to introspect into tiered routers to extract - // things like the pubsub router or the DHT (complicated, messy, - // probably not worth it). - // 2. Pass the IpfsNode into the RoutingOption (would also remove the - // PSRouter case below. - // 3. Introduce some kind of service manager? (my personal favorite but - // that requires a fair amount of work). - if dht, ok := out.Routing.(*dht.IpfsDHT); ok { - out.IpfsDHT = dht - - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return out.IpfsDHT.Close() - }, - }) - } - - return out, err -} - -type p2pRoutingIn struct { - fx.In - - BCfg *BuildCfg - Repo repo.Repo - Validator record.Validator - Host p2phost.Host - PubSub *pubsub.PubSub - - BaseRouting BaseRouting -} - -type p2pRoutingOut struct { - fx.Out - - IpfsRouting routing.IpfsRouting - PSRouter *psrouter.PubsubValueStore //TODO: optional -} - -func p2pOnlineRouting(mctx MetricsCtx, lc fx.Lifecycle, in p2pRoutingIn) (out p2pRoutingOut) { - out.IpfsRouting = in.BaseRouting - - if in.BCfg.getOpt("ipnsps") { - out.PSRouter = psrouter.NewPubsubValueStore( - lifecycleCtx(mctx, lc), - in.Host, - in.BaseRouting, - in.PubSub, - in.Validator, - ) - - out.IpfsRouting = rhelpers.Tiered{ - Routers: []routing.IpfsRouting{ - // Always check pubsub first. - &rhelpers.Compose{ - ValueStore: &rhelpers.LimitedValueStore{ - ValueStore: out.PSRouter, - Namespaces: []string{"ipns"}, - }, - }, - in.BaseRouting, - }, - Validator: in.Validator, - } - } - return out -} - -//////////// -// P2P services - -func autoNATService(mctx MetricsCtx, lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host) error { - if !cfg.Swarm.EnableAutoNATService { - return nil - } - var opts []libp2p.Option - if cfg.Experimental.QUIC { - opts = append(opts, libp2p.DefaultTransports, libp2p.Transport(quic.NewTransport)) - } - - _, err := autonat.NewAutoNATService(lifecycleCtx(mctx, lc), host, opts...) - return err -} - -func pubsubCtor(mctx MetricsCtx, lc fx.Lifecycle, host p2phost.Host, bcfg *BuildCfg, cfg *iconfig.Config) (service *pubsub.PubSub, err error) { - if !(bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps")) { - return nil, nil // TODO: mark optional - } - - var pubsubOptions []pubsub.Option - if cfg.Pubsub.DisableSigning { - pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false)) - } - - if cfg.Pubsub.StrictSignatureVerification { - pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true)) - } - - switch cfg.Pubsub.Router { - case "": - fallthrough - case "floodsub": - service, err = pubsub.NewFloodSub(lifecycleCtx(mctx, lc), host, pubsubOptions...) - - case "gossipsub": - service, err = pubsub.NewGossipSub(lifecycleCtx(mctx, lc), host, pubsubOptions...) - - default: - err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router) - } - - return service, err -} - -//////////// -// Offline services - -// offline.Exchange -// offroute.NewOfflineRouter - -func offlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) { - return namesys.NewNameSystem(rt, repo.Datastore(), 0), nil -} - -//////////// -// IPFS services - -func pinning(bstore bstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) { - internalDag := merkledag.NewDAGService(bserv.New(bstore, offline.Exchange(bstore))) - pinning, err := pin.LoadPinner(repo.Datastore(), ds, internalDag) - if err != nil { - // TODO: we should move towards only running 'NewPinner' explicitly on - // node init instead of implicitly here as a result of the pinner keys - // not being found in the datastore. - // this is kinda sketchy and could cause data loss - pinning = pin.NewPinner(repo.Datastore(), ds, internalDag) - } - - return pinning, nil -} - -func dagCtor(bs bserv.BlockService) format.DAGService { - return merkledag.NewDAGService(bs) -} - -func onlineExchangeCtor(mctx MetricsCtx, lc fx.Lifecycle, host p2phost.Host, rt routing.IpfsRouting, bs bstore.GCBlockstore) exchange.Interface { - bitswapNetwork := bsnet.NewFromIpfsHost(host, rt) - exch := bitswap.New(lifecycleCtx(mctx, lc), bitswapNetwork, bs) - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return exch.Close() - }, - }) - return exch -} - -func onlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo, cfg *iconfig.Config) (namesys.NameSystem, error) { - cs := cfg.Ipns.ResolveCacheSize - if cs == 0 { - cs = DefaultIpnsCacheSize - } - if cs < 0 { - return nil, fmt.Errorf("cannot specify negative resolve cache size") - } - return namesys.NewNameSystem(rt, repo.Datastore(), cs), nil -} - -func ipnsRepublisher(lc lcProcess, cfg *iconfig.Config, namesys namesys.NameSystem, repo repo.Repo, privKey ic.PrivKey) error { - repub := ipnsrp.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore()) - - if cfg.Ipns.RepublishPeriod != "" { - d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod) - if err != nil { - return fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err) - } - - if !u.Debug && (d < time.Minute || d > (time.Hour*24)) { - return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d) - } - - repub.Interval = d - } - - if cfg.Ipns.RecordLifetime != "" { - d, err := time.ParseDuration(cfg.Ipns.RecordLifetime) - if err != nil { - return fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err) - } - - repub.RecordLifetime = d - } - - lc.Run(repub.Run) - return nil -} - -type discoveryHandler struct { - ctx context.Context - host p2phost.Host -} - -func (dh *discoveryHandler) HandlePeerFound(p pstore.PeerInfo) { - log.Warning("trying peer info: ", p) - ctx, cancel := context.WithTimeout(dh.ctx, discoveryConnTimeout) - defer cancel() - if err := dh.host.Connect(ctx, p); err != nil { - log.Warning("Failed to connect to peer found by discovery: ", err) - } -} - -func newDiscoveryHandler(mctx MetricsCtx, lc fx.Lifecycle, host p2phost.Host) *discoveryHandler { - return &discoveryHandler{ - ctx: lifecycleCtx(mctx, lc), - host: host, - } -} - -func setupDiscovery(mctx MetricsCtx, lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host, handler *discoveryHandler) error { - if cfg.Discovery.MDNS.Enabled { - mdns := cfg.Discovery.MDNS - if mdns.Interval == 0 { - mdns.Interval = 5 - } - service, err := discovery.NewMdnsService(lifecycleCtx(mctx, lc), host, time.Duration(mdns.Interval)*time.Second, discovery.ServiceTag) - if err != nil { - log.Error("mdns error: ", err) - return nil - } - service.RegisterNotifee(handler) - } - return nil -} - -func providerQueue(mctx MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*provider.Queue, error) { - return provider.NewQueue(lifecycleCtx(mctx, lc), "provider-v1", repo.Datastore()) -} - -func providerCtor(mctx MetricsCtx, lc fx.Lifecycle, queue *provider.Queue, rt routing.IpfsRouting) provider.Provider { - return provider.NewProvider(lifecycleCtx(mctx, lc), queue, rt) -} - -func reproviderCtor(mctx MetricsCtx, lc fx.Lifecycle, cfg *iconfig.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*rp.Reprovider, error) { - var keyProvider rp.KeyChanFunc - - switch cfg.Reprovider.Strategy { - case "all": - fallthrough - case "": - keyProvider = rp.NewBlockstoreProvider(bs) - case "roots": - keyProvider = rp.NewPinnedProvider(pinning, ds, true) - case "pinned": - keyProvider = rp.NewPinnedProvider(pinning, ds, false) - default: - return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) - } - return rp.NewReprovider(lifecycleCtx(mctx, lc), rt, keyProvider), nil -} - -func reprovider(cfg *iconfig.Config, reprovider *rp.Reprovider) error { - reproviderInterval := kReprovideFrequency - if cfg.Reprovider.Interval != "" { - dur, err := time.ParseDuration(cfg.Reprovider.Interval) - if err != nil { - return err - } - - reproviderInterval = dur - } - - go reprovider.Run(reproviderInterval) - return nil -} - -func files(mctx MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) { - dsk := ds.NewKey("/local/filesroot") - pf := func(ctx context.Context, c cid.Cid) error { - return repo.Datastore().Put(dsk, c.Bytes()) - } - - var nd *merkledag.ProtoNode - val, err := repo.Datastore().Get(dsk) - ctx := lifecycleCtx(mctx, lc) - - switch { - case err == ds.ErrNotFound || val == nil: - nd = ft.EmptyDirNode() - err := dag.Add(ctx, nd) - if err != nil { - return nil, fmt.Errorf("failure writing to dagstore: %s", err) - } - case err == nil: - c, err := cid.Cast(val) - if err != nil { - return nil, err - } - - rnd, err := dag.Get(ctx, c) - if err != nil { - return nil, fmt.Errorf("error loading filesroot from DAG: %s", err) - } - - pbnd, ok := rnd.(*merkledag.ProtoNode) - if !ok { - return nil, merkledag.ErrNotProtobuf - } - - nd = pbnd - default: - return nil, err - } - - root, err := mfs.NewRoot(ctx, dag, nd, pf) - - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return root.Close() - }, - }) - - return root, err -} - -//////////// -// Hacks - -// lifecycleCtx creates a context which will be cancelled when lifecycle stops -// -// This is a hack which we need because most of our services use contexts in a -// wrong way -func lifecycleCtx(mctx MetricsCtx, lc fx.Lifecycle) context.Context { - ctx, cancel := context.WithCancel(mctx) - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - cancel() - return nil - }, - }) - return ctx -} - -type lcProcess struct { - fx.In - - LC fx.Lifecycle - Proc goprocess.Process -} - -func (lp *lcProcess) Run(f goprocess.ProcessFunc) { - proc := make(chan goprocess.Process, 1) - lp.LC.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - proc <- lp.Proc.Go(f) - return nil - }, - OnStop: func(ctx context.Context) error { - return (<-proc).Close() // todo: respect ctx, somehow - }, - }) +func setupSharding(cfg *iconfig.Config) { + // TEMP: setting global sharding switch here + uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled } func baseProcess(lc fx.Lifecycle) goprocess.Process { @@ -845,8 +27,3 @@ func baseProcess(lc fx.Lifecycle) goprocess.Process { }) return p } - -func setupSharding(cfg *iconfig.Config) { - // TEMP: setting global sharding switch here - uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled -} diff --git a/core/node/builder.go b/core/node/builder.go new file mode 100644 index 00000000000..112d1f7fe8f --- /dev/null +++ b/core/node/builder.go @@ -0,0 +1,36 @@ +package node + +import ( + "github.com/ipfs/go-ipfs/repo" +) + +type BuildCfg struct { + // If online is set, the node will have networking enabled + Online bool + + // ExtraOpts is a map of extra options used to configure the ipfs nodes creation + ExtraOpts map[string]bool + + // If permanent then node should run more expensive processes + // that will improve performance in long run + Permanent bool + + // DisableEncryptedConnections disables connection encryption *entirely*. + // DO NOT SET THIS UNLESS YOU'RE TESTING. + DisableEncryptedConnections bool + + // If NilRepo is set, a Repo backed by a nil datastore will be constructed + NilRepo bool + + Routing RoutingOption + Host HostOption + Repo repo.Repo +} + +func (cfg *BuildCfg) getOpt(key string) bool { + if cfg.ExtraOpts == nil { + return false + } + + return cfg.ExtraOpts[key] +} diff --git a/core/node/core.go b/core/node/core.go new file mode 100644 index 00000000000..160a833cf26 --- /dev/null +++ b/core/node/core.go @@ -0,0 +1,117 @@ +package node + +import ( + "context" + "fmt" + + "github.com/ipfs/go-bitswap" + "github.com/ipfs/go-bitswap/network" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + blockstore "github.com/ipfs/go-ipfs-blockstore" + exchange "github.com/ipfs/go-ipfs-exchange-interface" + offline "github.com/ipfs/go-ipfs-exchange-offline" + format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" + "github.com/ipfs/go-mfs" + "github.com/ipfs/go-unixfs" + host "github.com/libp2p/go-libp2p-host" + routing "github.com/libp2p/go-libp2p-routing" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/repo" +) + +func BlockServiceCtor(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService { + bsvc := blockservice.New(bs, rem) + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return bsvc.Close() + }, + }) + + return bsvc +} + +func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) { + internalDag := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore))) + pinning, err := pin.LoadPinner(repo.Datastore(), ds, internalDag) + if err != nil { + // TODO: we should move towards only running 'NewPinner' explicitly on + // node init instead of implicitly here as a result of the pinner keys + // not being found in the datastore. + // this is kinda sketchy and could cause data loss + pinning = pin.NewPinner(repo.Datastore(), ds, internalDag) + } + + return pinning, nil +} + +func DagCtor(bs blockservice.BlockService) format.DAGService { + return merkledag.NewDAGService(bs) +} + +func OnlineExchangeCtor(mctx MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.IpfsRouting, bs blockstore.GCBlockstore) exchange.Interface { + bitswapNetwork := network.NewFromIpfsHost(host, rt) + exch := bitswap.New(lifecycleCtx(mctx, lc), bitswapNetwork, bs) + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return exch.Close() + }, + }) + return exch +} + +func Files(mctx MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) { + dsk := datastore.NewKey("/local/filesroot") + pf := func(ctx context.Context, c cid.Cid) error { + return repo.Datastore().Put(dsk, c.Bytes()) + } + + var nd *merkledag.ProtoNode + val, err := repo.Datastore().Get(dsk) + ctx := lifecycleCtx(mctx, lc) + + switch { + case err == datastore.ErrNotFound || val == nil: + nd = unixfs.EmptyDirNode() + err := dag.Add(ctx, nd) + if err != nil { + return nil, fmt.Errorf("failure writing to dagstore: %s", err) + } + case err == nil: + c, err := cid.Cast(val) + if err != nil { + return nil, err + } + + rnd, err := dag.Get(ctx, c) + if err != nil { + return nil, fmt.Errorf("error loading filesroot from DAG: %s", err) + } + + pbnd, ok := rnd.(*merkledag.ProtoNode) + if !ok { + return nil, merkledag.ErrNotProtobuf + } + + nd = pbnd + default: + return nil, err + } + + root, err := mfs.NewRoot(ctx, dag, nd, pf) + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return root.Close() + }, + }) + + return root, err +} + +type MetricsCtx context.Context diff --git a/core/node/discovery.go b/core/node/discovery.go new file mode 100644 index 00000000000..f51e7593d87 --- /dev/null +++ b/core/node/discovery.go @@ -0,0 +1,51 @@ +package node + +import ( + "context" + "time" + + "github.com/ipfs/go-ipfs-config" + "github.com/libp2p/go-libp2p-host" + "github.com/libp2p/go-libp2p-peerstore" + "github.com/libp2p/go-libp2p/p2p/discovery" + "go.uber.org/fx" +) + +const discoveryConnTimeout = time.Second * 30 + +type discoveryHandler struct { + ctx context.Context + host host.Host +} + +func (dh *discoveryHandler) HandlePeerFound(p peerstore.PeerInfo) { + log.Warning("trying peer info: ", p) + ctx, cancel := context.WithTimeout(dh.ctx, discoveryConnTimeout) + defer cancel() + if err := dh.host.Connect(ctx, p); err != nil { + log.Warning("Failed to connect to peer found by discovery: ", err) + } +} + +func NewDiscoveryHandler(mctx MetricsCtx, lc fx.Lifecycle, host host.Host) *discoveryHandler { + return &discoveryHandler{ + ctx: lifecycleCtx(mctx, lc), + host: host, + } +} + +func SetupDiscovery(mctx MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host, handler *discoveryHandler) error { + if cfg.Discovery.MDNS.Enabled { + mdns := cfg.Discovery.MDNS + if mdns.Interval == 0 { + mdns.Interval = 5 + } + service, err := discovery.NewMdnsService(lifecycleCtx(mctx, lc), host, time.Duration(mdns.Interval)*time.Second, discovery.ServiceTag) + if err != nil { + log.Error("mdns error: ", err) + return nil + } + service.RegisterNotifee(handler) + } + return nil +} diff --git a/core/node/groups.go b/core/node/groups.go new file mode 100644 index 00000000000..202c0c4f2a9 --- /dev/null +++ b/core/node/groups.go @@ -0,0 +1,88 @@ +package node + +import ( + offline "github.com/ipfs/go-ipfs-exchange-offline" + "go.uber.org/fx" + + offroute "github.com/ipfs/go-ipfs-routing/offline" + "github.com/ipfs/go-ipfs/p2p" + "github.com/ipfs/go-ipfs/provider" +) + +var LibP2P = fx.Options( + fx.Provide(P2PAddrFilters), + fx.Provide(P2PBandwidthCounter), + fx.Provide(P2PPNet), + fx.Provide(P2PAddrsFactory), + fx.Provide(P2PConnectionManager), + fx.Provide(P2PSmuxTransport), + fx.Provide(P2PNatPortMap), + fx.Provide(P2PRelay), + fx.Provide(P2PAutoRealy), + fx.Provide(P2PDefaultTransports), + fx.Provide(P2PQUIC), + + fx.Provide(P2PHostOption), + fx.Provide(P2PHost), + fx.Provide(P2POnlineRouting), + + fx.Provide(Pubsub), + fx.Provide(NewDiscoveryHandler), + + fx.Invoke(AutoNATService), + fx.Invoke(P2PPNetChecker), + fx.Invoke(StartListening), + fx.Invoke(SetupDiscovery), +) + +var Storage = fx.Options( + fx.Provide(RepoConfig), + fx.Provide(DatastoreCtor), + fx.Provide(BaseBlockstoreCtor), + fx.Provide(GcBlockstoreCtor), +) + +var Identity = fx.Options( + fx.Provide(PeerID), + fx.Provide(PrivateKey), + fx.Provide(Peerstore), +) + +var IPNS = fx.Options( + fx.Provide(RecordValidator), +) + +var Providers = fx.Options( + fx.Provide(ProviderQueue), + fx.Provide(ProviderCtor), + fx.Provide(ReproviderCtor), + + fx.Invoke(Reprovider), + fx.Invoke(provider.Provider.Run), +) + +var Online = fx.Options( + fx.Provide(OnlineExchangeCtor), + fx.Provide(OnlineNamesysCtor), + + fx.Invoke(IpnsRepublisher), + + fx.Provide(p2p.NewP2P), + + LibP2P, + Providers, +) + +var Offline = fx.Options( + fx.Provide(offline.Exchange), + fx.Provide(OfflineNamesysCtor), + fx.Provide(offroute.NewOfflineRouter), + fx.Provide(provider.NewOfflineProvider), +) + +func Networked(online bool) fx.Option { + if online { + return Online + } + return Offline +} diff --git a/core/node/helpers.go b/core/node/helpers.go new file mode 100644 index 00000000000..a785124a33f --- /dev/null +++ b/core/node/helpers.go @@ -0,0 +1,43 @@ +package node + +import ( + "context" + + "github.com/jbenet/goprocess" + "go.uber.org/fx" +) + +// lifecycleCtx creates a context which will be cancelled when lifecycle stops +// +// This is a hack which we need because most of our services use contexts in a +// wrong way +func lifecycleCtx(mctx MetricsCtx, lc fx.Lifecycle) context.Context { + ctx, cancel := context.WithCancel(mctx) + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + cancel() + return nil + }, + }) + return ctx +} + +type lcProcess struct { + fx.In + + LC fx.Lifecycle + Proc goprocess.Process +} + +func (lp *lcProcess) Run(f goprocess.ProcessFunc) { + proc := make(chan goprocess.Process, 1) + lp.LC.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + proc <- lp.Proc.Go(f) + return nil + }, + OnStop: func(ctx context.Context) error { + return (<-proc).Close() // todo: respect ctx, somehow + }, + }) +} diff --git a/core/node/identity.go b/core/node/identity.go new file mode 100644 index 00000000000..eb390309875 --- /dev/null +++ b/core/node/identity.go @@ -0,0 +1,48 @@ +package node + +import ( + "errors" + "fmt" + + "github.com/ipfs/go-ipfs-config" + "github.com/libp2p/go-libp2p-crypto" + "github.com/libp2p/go-libp2p-peer" +) + +func PeerID(cfg *config.Config) (peer.ID, error) { + cid := cfg.Identity.PeerID + if cid == "" { + return "", errors.New("identity was not set in config (was 'ipfs init' run?)") + } + if len(cid) == 0 { + return "", errors.New("no peer ID in config! (was 'ipfs init' run?)") + } + + id, err := peer.IDB58Decode(cid) + if err != nil { + return "", fmt.Errorf("peer ID invalid: %s", err) + } + + return id, nil +} + +func PrivateKey(cfg *config.Config, id peer.ID) (crypto.PrivKey, error) { + if cfg.Identity.PrivKey == "" { + return nil, nil + } + + sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!") + if err != nil { + return nil, err + } + + id2, err := peer.IDFromPrivateKey(sk) + if err != nil { + return nil, err + } + + if id2 != id { + return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2) + } + return sk, nil +} diff --git a/core/node/ipns.go b/core/node/ipns.go new file mode 100644 index 00000000000..1e8511d2cbd --- /dev/null +++ b/core/node/ipns.go @@ -0,0 +1,71 @@ +package node + +import ( + "fmt" + "time" + + "github.com/ipfs/go-ipfs-config" + "github.com/ipfs/go-ipfs-util" + "github.com/ipfs/go-ipns" + "github.com/libp2p/go-libp2p-crypto" + "github.com/libp2p/go-libp2p-peerstore" + "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p-routing" + + "github.com/ipfs/go-ipfs/namesys" + "github.com/ipfs/go-ipfs/namesys/republisher" + "github.com/ipfs/go-ipfs/repo" +) + +const DefaultIpnsCacheSize = 128 + +func RecordValidator(ps peerstore.Peerstore) record.Validator { + return record.NamespacedValidator{ + "pk": record.PublicKeyValidator{}, + "ipns": ipns.Validator{KeyBook: ps}, + } +} + +func OfflineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) { + return namesys.NewNameSystem(rt, repo.Datastore(), 0), nil +} + +func OnlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo, cfg *config.Config) (namesys.NameSystem, error) { + cs := cfg.Ipns.ResolveCacheSize + if cs == 0 { + cs = DefaultIpnsCacheSize + } + if cs < 0 { + return nil, fmt.Errorf("cannot specify negative resolve cache size") + } + return namesys.NewNameSystem(rt, repo.Datastore(), cs), nil +} + +func IpnsRepublisher(lc lcProcess, cfg *config.Config, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error { + repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore()) + + if cfg.Ipns.RepublishPeriod != "" { + d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod) + if err != nil { + return fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err) + } + + if !util.Debug && (d < time.Minute || d > (time.Hour*24)) { + return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d) + } + + repub.Interval = d + } + + if cfg.Ipns.RecordLifetime != "" { + d, err := time.ParseDuration(cfg.Ipns.RecordLifetime) + if err != nil { + return fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err) + } + + repub.RecordLifetime = d + } + + lc.Run(repub.Run) + return nil +} diff --git a/core/node/libp2p.go b/core/node/libp2p.go new file mode 100644 index 00000000000..a2f7c425322 --- /dev/null +++ b/core/node/libp2p.go @@ -0,0 +1,597 @@ +package node + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + "strings" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-ipfs-config" + nilrouting "github.com/ipfs/go-ipfs-routing/none" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-autonat-svc" + "github.com/libp2p/go-libp2p-circuit" + circuit "github.com/libp2p/go-libp2p-circuit" + "github.com/libp2p/go-libp2p-connmgr" + "github.com/libp2p/go-libp2p-crypto" + "github.com/libp2p/go-libp2p-host" + "github.com/libp2p/go-libp2p-kad-dht" + dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts" + "github.com/libp2p/go-libp2p-metrics" + "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-peerstore" + "github.com/libp2p/go-libp2p-peerstore/pstoremem" + "github.com/libp2p/go-libp2p-pnet" + "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p-pubsub-router" + "github.com/libp2p/go-libp2p-quic-transport" + "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p-routing" + "github.com/libp2p/go-libp2p-routing-helpers" + p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic" + "github.com/libp2p/go-libp2p/p2p/host/routed" + mafilter "github.com/libp2p/go-maddr-filter" + smux "github.com/libp2p/go-stream-muxer" + ma "github.com/multiformats/go-multiaddr" + mplex "github.com/whyrusleeping/go-smux-multiplex" + yamux "github.com/whyrusleeping/go-smux-yamux" + "github.com/whyrusleeping/multiaddr-filter" + mamask "github.com/whyrusleeping/multiaddr-filter" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/repo" +) + +var log = logging.Logger("node") + +type HostOption func(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) +type RoutingOption func(context.Context, host.Host, datastore.Batching, record.Validator) (routing.IpfsRouting, error) + +var DefaultHostOption HostOption = constructPeerHost + +// isolates the complex initialization steps +func constructPeerHost(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) { + pkey := ps.PrivKey(id) + if pkey == nil { + return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty()) + } + options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...) + return libp2p.New(ctx, options...) +} + +func constructDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) { + return dht.New( + ctx, host, + dhtopts.Datastore(dstore), + dhtopts.Validator(validator), + ) +} + +func constructClientDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) { + return dht.New( + ctx, host, + dhtopts.Client(true), + dhtopts.Datastore(dstore), + dhtopts.Validator(validator), + ) +} + +var DHTOption RoutingOption = constructDHTRouting +var DHTClientOption RoutingOption = constructClientDHTRouting +var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting + +func Peerstore(id peer.ID, sk crypto.PrivKey) peerstore.Peerstore { + ps := pstoremem.NewPeerstore() + + if sk != nil { + ps.AddPrivKey(id, sk) + ps.AddPubKey(id, sk.GetPublic()) + } + + return ps +} + +func P2PAddrFilters(cfg *config.Config) (opts Libp2pOpts, err error) { + for _, s := range cfg.Swarm.AddrFilters { + f, err := mask.NewMask(s) + if err != nil { + return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s) + } + opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f)) + } + return opts, nil +} + +func P2PBandwidthCounter(cfg *config.Config) (opts Libp2pOpts, reporter metrics.Reporter) { + reporter = metrics.NewBandwidthCounter() + + if !cfg.Swarm.DisableBandwidthMetrics { + opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) + } + return opts, reporter +} + +type Libp2pOpts struct { + fx.Out + + Opts []libp2p.Option `group:"libp2p"` +} + +type PNetFingerprint []byte // TODO: find some better place +func P2PPNet(repo repo.Repo) (opts Libp2pOpts, fp PNetFingerprint, err error) { + swarmkey, err := repo.SwarmKey() + if err != nil || swarmkey == nil { + return opts, nil, err + } + + protec, err := pnet.NewProtector(bytes.NewReader(swarmkey)) + if err != nil { + return opts, nil, fmt.Errorf("failed to configure private network: %s", err) + } + fp = protec.Fingerprint() + + opts.Opts = append(opts.Opts, libp2p.PrivateNetwork(protec)) + return opts, fp, nil +} + +func P2PPNetChecker(repo repo.Repo, ph host.Host, lc fx.Lifecycle) error { + // TODO: better check? + swarmkey, err := repo.SwarmKey() + if err != nil || swarmkey == nil { + return err + } + + done := make(chan struct{}) + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + go func() { + t := time.NewTicker(30 * time.Second) + <-t.C // swallow one tick + for { + select { + case <-t.C: + if len(ph.Network().Peers()) == 0 { + log.Warning("We are in private network and have no peers.") + log.Warning("This might be configuration mistake.") + } + case <-done: + return + } + } + }() + return nil + }, + OnStop: func(_ context.Context) error { + close(done) + return nil + }, + }) + return nil +} + +func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { + var annAddrs []ma.Multiaddr + for _, addr := range cfg.Announce { + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } + annAddrs = append(annAddrs, maddr) + } + + filters := mafilter.NewFilters() + noAnnAddrs := map[string]bool{} + for _, addr := range cfg.NoAnnounce { + f, err := mamask.NewMask(addr) + if err == nil { + filters.AddDialFilter(f) + continue + } + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } + noAnnAddrs[maddr.String()] = true + } + + return func(allAddrs []ma.Multiaddr) []ma.Multiaddr { + var addrs []ma.Multiaddr + if len(annAddrs) > 0 { + addrs = annAddrs + } else { + addrs = allAddrs + } + + var out []ma.Multiaddr + for _, maddr := range addrs { + // check for exact matches + ok := noAnnAddrs[maddr.String()] + // check for /ipcidr matches + if !ok && !filters.AddrBlocked(maddr) { + out = append(out, maddr) + } + } + return out + }, nil +} + +func P2PAddrsFactory(cfg *config.Config) (opts Libp2pOpts, err error) { + addrsFactory, err := makeAddrsFactory(cfg.Addresses) + if err != nil { + return opts, err + } + if !cfg.Swarm.DisableRelay { + addrsFactory = composeAddrsFactory(addrsFactory, filterRelayAddrs) + } + opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory)) + return +} + +func filterRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + var raddrs []ma.Multiaddr + for _, addr := range addrs { + _, err := addr.ValueForProtocol(circuit.P_CIRCUIT) + if err == nil { + continue + } + raddrs = append(raddrs, addr) + } + return raddrs +} + +func composeAddrsFactory(f, g p2pbhost.AddrsFactory) p2pbhost.AddrsFactory { + return func(addrs []ma.Multiaddr) []ma.Multiaddr { + return f(g(addrs)) + } +} + +func P2PConnectionManager(cfg *config.Config) (opts Libp2pOpts, err error) { + grace := config.DefaultConnMgrGracePeriod + low := config.DefaultConnMgrHighWater + high := config.DefaultConnMgrHighWater + + switch cfg.Swarm.ConnMgr.Type { + case "": + // 'default' value is the basic connection manager + return + case "none": + return opts, nil + case "basic": + grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod) + if err != nil { + return opts, fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err) + } + + low = cfg.Swarm.ConnMgr.LowWater + high = cfg.Swarm.ConnMgr.HighWater + default: + return opts, fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type) + } + + cm := connmgr.NewConnManager(low, high, grace) + opts.Opts = append(opts.Opts, libp2p.ConnectionManager(cm)) + return +} + +func makeSmuxTransportOption(mplexExp bool) libp2p.Option { + const yamuxID = "/yamux/1.0.0" + const mplexID = "/mplex/6.7.0" + + ymxtpt := &yamux.Transport{ + AcceptBacklog: 512, + ConnectionWriteTimeout: time.Second * 10, + KeepAliveInterval: time.Second * 30, + EnableKeepAlive: true, + MaxStreamWindowSize: uint32(16 * 1024 * 1024), // 16MiB + LogOutput: ioutil.Discard, + } + + if os.Getenv("YAMUX_DEBUG") != "" { + ymxtpt.LogOutput = os.Stderr + } + + muxers := map[string]smux.Transport{yamuxID: ymxtpt} + if mplexExp { + muxers[mplexID] = mplex.DefaultTransport + } + + // Allow muxer preference order overriding + order := []string{yamuxID, mplexID} + if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" { + order = strings.Fields(prefs) + } + + opts := make([]libp2p.Option, 0, len(order)) + for _, id := range order { + tpt, ok := muxers[id] + if !ok { + log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id) + continue + } + delete(muxers, id) + opts = append(opts, libp2p.Muxer(id, tpt)) + } + + return libp2p.ChainOptions(opts...) +} + +func P2PSmuxTransport(bcfg *BuildCfg) (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, makeSmuxTransportOption(bcfg.getOpt("mplex"))) + return +} + +func P2PNatPortMap(cfg *config.Config) (opts Libp2pOpts, err error) { + if !cfg.Swarm.DisableNatPortMap { + opts.Opts = append(opts.Opts, libp2p.NATPortMap()) + } + return +} + +func P2PRelay(cfg *config.Config) (opts Libp2pOpts, err error) { + if cfg.Swarm.DisableRelay { + // Enabled by default. + opts.Opts = append(opts.Opts, libp2p.DisableRelay()) + } else { + relayOpts := []relay.RelayOpt{relay.OptDiscovery} + if cfg.Swarm.EnableRelayHop { + relayOpts = append(relayOpts, relay.OptHop) + } + opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...)) + } + return +} + +func P2PAutoRealy(cfg *config.Config) (opts Libp2pOpts, err error) { + // enable autorelay + if cfg.Swarm.EnableAutoRelay { + opts.Opts = append(opts.Opts, libp2p.EnableAutoRelay()) + } + return +} + +func P2PDefaultTransports() (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, libp2p.DefaultTransports) + return +} + +func P2PQUIC(cfg *config.Config) (opts Libp2pOpts, err error) { + if cfg.Experimental.QUIC { + opts.Opts = append(opts.Opts, libp2p.Transport(libp2pquic.NewTransport)) + } + return +} + +type P2PHostIn struct { + fx.In + + BCfg *BuildCfg + Repo repo.Repo + Validator record.Validator + HostOption HostOption + ID peer.ID + Peerstore peerstore.Peerstore + + Opts [][]libp2p.Option `group:"libp2p"` +} + +type BaseRouting routing.IpfsRouting +type P2PHostOut struct { + fx.Out + + Host host.Host + Routing BaseRouting + IpfsDHT *dht.IpfsDHT +} + +// TODO: move some of this into params struct +func P2PHost(mctx MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) { + opts := []libp2p.Option{libp2p.NoListenAddrs} + for _, o := range params.Opts { + opts = append(opts, o...) + } + + ctx, cancel := context.WithCancel(mctx) + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + cancel() + return nil + }, + }) + + opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { + r, err := params.BCfg.Routing(ctx, h, params.Repo.Datastore(), params.Validator) + out.Routing = r + return r, err + })) + + out.Host, err = params.HostOption(ctx, params.ID, params.Peerstore, opts...) + if err != nil { + return P2PHostOut{}, err + } + + // this code is necessary just for tests: mock network constructions + // ignore the libp2p constructor options that actually construct the routing! + if out.Routing == nil { + r, err := params.BCfg.Routing(ctx, out.Host, params.Repo.Datastore(), params.Validator) + if err != nil { + return P2PHostOut{}, err + } + out.Routing = r + out.Host = routedhost.Wrap(out.Host, out.Routing) + } + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return out.Host.Close() + }, + }) + + // TODO: break this up into more DI units + // TODO: I'm not a fan of type assertions like this but the + // `RoutingOption` system doesn't currently provide access to the + // IpfsNode. + // + // Ideally, we'd do something like: + // + // 1. Add some fancy method to introspect into tiered routers to extract + // things like the pubsub router or the DHT (complicated, messy, + // probably not worth it). + // 2. Pass the IpfsNode into the RoutingOption (would also remove the + // PSRouter case below. + // 3. Introduce some kind of service manager? (my personal favorite but + // that requires a fair amount of work). + if dht, ok := out.Routing.(*dht.IpfsDHT); ok { + out.IpfsDHT = dht + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return out.IpfsDHT.Close() + }, + }) + } + + return out, err +} + +type p2pRoutingIn struct { + fx.In + + BCfg *BuildCfg + Repo repo.Repo + Validator record.Validator + Host host.Host + PubSub *pubsub.PubSub + + BaseRouting BaseRouting +} + +type p2pRoutingOut struct { + fx.Out + + IpfsRouting routing.IpfsRouting + PSRouter *namesys.PubsubValueStore // TODO: optional +} + +func P2POnlineRouting(mctx MetricsCtx, lc fx.Lifecycle, in p2pRoutingIn) (out p2pRoutingOut) { + out.IpfsRouting = in.BaseRouting + + if in.BCfg.getOpt("ipnsps") { + out.PSRouter = namesys.NewPubsubValueStore( + lifecycleCtx(mctx, lc), + in.Host, + in.BaseRouting, + in.PubSub, + in.Validator, + ) + + out.IpfsRouting = routinghelpers.Tiered{ + Routers: []routing.IpfsRouting{ + // Always check pubsub first. + &routinghelpers.Compose{ + ValueStore: &routinghelpers.LimitedValueStore{ + ValueStore: out.PSRouter, + Namespaces: []string{"ipns"}, + }, + }, + in.BaseRouting, + }, + Validator: in.Validator, + } + } + return out +} + +func AutoNATService(mctx MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host) error { + if !cfg.Swarm.EnableAutoNATService { + return nil + } + var opts []libp2p.Option + if cfg.Experimental.QUIC { + opts = append(opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport)) + } + + _, err := autonat.NewAutoNATService(lifecycleCtx(mctx, lc), host, opts...) + return err +} + +func Pubsub(mctx MetricsCtx, lc fx.Lifecycle, host host.Host, bcfg *BuildCfg, cfg *config.Config) (service *pubsub.PubSub, err error) { + if !(bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps")) { + return nil, nil // TODO: mark optional + } + + var pubsubOptions []pubsub.Option + if cfg.Pubsub.DisableSigning { + pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false)) + } + + if cfg.Pubsub.StrictSignatureVerification { + pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true)) + } + + switch cfg.Pubsub.Router { + case "": + fallthrough + case "floodsub": + service, err = pubsub.NewFloodSub(lifecycleCtx(mctx, lc), host, pubsubOptions...) + + case "gossipsub": + service, err = pubsub.NewGossipSub(lifecycleCtx(mctx, lc), host, pubsubOptions...) + + default: + err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router) + } + + return service, err +} + +func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { + var listen []ma.Multiaddr + for _, addr := range cfg.Addresses.Swarm { + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm) + } + listen = append(listen, maddr) + } + + return listen, nil +} + +func StartListening(host host.Host, cfg *config.Config) error { + listenAddrs, err := listenAddresses(cfg) + if err != nil { + return err + } + + // Actually start listening: + if err := host.Network().Listen(listenAddrs...); err != nil { + return err + } + + // list out our addresses + addrs, err := host.Network().InterfaceListenAddresses() + if err != nil { + return err + } + log.Infof("Swarm listening at: %s", addrs) + return nil +} + +func P2PHostOption(bcfg *BuildCfg) (hostOption HostOption, err error) { + hostOption = bcfg.Host + if bcfg.DisableEncryptedConnections { + innerHostOption := hostOption + hostOption = func(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) { + return innerHostOption(ctx, id, ps, append(options, libp2p.NoSecurity)...) + } + // TODO: shouldn't this be Errorf to guarantee visibility? + log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS. + You will not be able to connect to any nodes configured to use encrypted connections`) + } + return hostOption, nil +} diff --git a/core/node/provider.go b/core/node/provider.go new file mode 100644 index 00000000000..8c679244014 --- /dev/null +++ b/core/node/provider.go @@ -0,0 +1,59 @@ +package node + +import ( + "fmt" + "time" + + "github.com/ipfs/go-ipfs-config" + "github.com/ipfs/go-ipld-format" + "github.com/libp2p/go-libp2p-routing" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/exchange/reprovide" + "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/provider" + "github.com/ipfs/go-ipfs/repo" +) + +const kReprovideFrequency = time.Hour * 12 + +func ProviderQueue(mctx MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*provider.Queue, error) { + return provider.NewQueue(lifecycleCtx(mctx, lc), "provider-v1", repo.Datastore()) +} + +func ProviderCtor(mctx MetricsCtx, lc fx.Lifecycle, queue *provider.Queue, rt routing.IpfsRouting) provider.Provider { + return provider.NewProvider(lifecycleCtx(mctx, lc), queue, rt) +} + +func ReproviderCtor(mctx MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) { + var keyProvider reprovide.KeyChanFunc + + switch cfg.Reprovider.Strategy { + case "all": + fallthrough + case "": + keyProvider = reprovide.NewBlockstoreProvider(bs) + case "roots": + keyProvider = reprovide.NewPinnedProvider(pinning, ds, true) + case "pinned": + keyProvider = reprovide.NewPinnedProvider(pinning, ds, false) + default: + return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) + } + return reprovide.NewReprovider(lifecycleCtx(mctx, lc), rt, keyProvider), nil +} + +func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error { + reproviderInterval := kReprovideFrequency + if cfg.Reprovider.Interval != "" { + dur, err := time.ParseDuration(cfg.Reprovider.Interval) + if err != nil { + return err + } + + reproviderInterval = dur + } + + go reprovider.Run(reproviderInterval) + return nil +} diff --git a/core/node/storage.go b/core/node/storage.go new file mode 100644 index 00000000000..e956293c6de --- /dev/null +++ b/core/node/storage.go @@ -0,0 +1,94 @@ +package node + +import ( + "context" + "os" + "syscall" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/retrystore" + blockstore "github.com/ipfs/go-ipfs-blockstore" + config "github.com/ipfs/go-ipfs-config" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/filestore" + "github.com/ipfs/go-ipfs/repo" + "github.com/ipfs/go-ipfs/thirdparty/cidv0v1" + "github.com/ipfs/go-ipfs/thirdparty/verifbs" +) + +func isTooManyFDError(err error) bool { + perr, ok := err.(*os.PathError) + if ok && perr.Err == syscall.EMFILE { + return true + } + + return false +} + +func RepoConfig(repo repo.Repo) (*config.Config, error) { + return repo.Config() +} + +func DatastoreCtor(repo repo.Repo) datastore.Datastore { + return repo.Datastore() +} + +type BaseBlocks blockstore.Blockstore + +func BaseBlockstoreCtor(mctx MetricsCtx, repo repo.Repo, cfg *config.Config, bcfg *BuildCfg, lc fx.Lifecycle) (bs BaseBlocks, err error) { + rds := &retrystore.Datastore{ + Batching: repo.Datastore(), + Delay: time.Millisecond * 200, + Retries: 6, + TempErrFunc: isTooManyFDError, + } + // hash security + bs = blockstore.NewBlockstore(rds) + bs = &verifbs.VerifBS{Blockstore: bs} + + opts := blockstore.DefaultCacheOpts() + opts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize + if !bcfg.Permanent { + opts.HasBloomFilterSize = 0 + } + + if !bcfg.NilRepo { + ctx, cancel := context.WithCancel(mctx) + + lc.Append(fx.Hook{ + OnStop: func(context context.Context) error { + cancel() + return nil + }, + }) + bs, err = blockstore.CachedBlockstore(ctx, bs, opts) + if err != nil { + return nil, err + } + } + + bs = blockstore.NewIdStore(bs) + bs = cidv0v1.NewBlockstore(bs) + + if cfg.Datastore.HashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly? + bs.HashOnRead(true) + } + + return +} + +func GcBlockstoreCtor(repo repo.Repo, bb BaseBlocks, cfg *config.Config) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) { + gclocker = blockstore.NewGCLocker() + gcbs = blockstore.NewGCBlockstore(bb, gclocker) + + if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled { + // hash security + fstore = filestore.NewFilestore(bb, repo.FileManager()) // TODO: mark optional + gcbs = blockstore.NewGCBlockstore(fstore, gclocker) + gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs} + } + bs = gcbs + return +} diff --git a/namesys/republisher/repub_test.go b/namesys/republisher/repub_test.go index 8f0048c4c8f..48a0b086f0c 100644 --- a/namesys/republisher/repub_test.go +++ b/namesys/republisher/repub_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/bootstrap" mock "github.com/ipfs/go-ipfs/core/mock" namesys "github.com/ipfs/go-ipfs/namesys" . "github.com/ipfs/go-ipfs/namesys/republisher" @@ -45,7 +46,7 @@ func TestRepublish(t *testing.T) { t.Fatal(err) } - bsinf := core.BootstrapConfigWithPeers( + bsinf := bootstrap.BootstrapConfigWithPeers( []pstore.PeerInfo{ nodes[0].Peerstore.PeerInfo(nodes[0].Identity), }, diff --git a/test/integration/addcat_test.go b/test/integration/addcat_test.go index b0def746dae..98e6936ee82 100644 --- a/test/integration/addcat_test.go +++ b/test/integration/addcat_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/core/coreapi" mock "github.com/ipfs/go-ipfs/core/mock" "github.com/ipfs/go-ipfs/thirdparty/unit" @@ -140,10 +141,10 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error { bs1 := []pstore.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)} bs2 := []pstore.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)} - if err := catter.Bootstrap(core.BootstrapConfigWithPeers(bs1)); err != nil { + if err := catter.Bootstrap(bootstrap.BootstrapConfigWithPeers(bs1)); err != nil { return err } - if err := adder.Bootstrap(core.BootstrapConfigWithPeers(bs2)); err != nil { + if err := adder.Bootstrap(bootstrap.BootstrapConfigWithPeers(bs2)); err != nil { return err } diff --git a/test/integration/bench_cat_test.go b/test/integration/bench_cat_test.go index e8a2322deed..a40fcfe37b1 100644 --- a/test/integration/bench_cat_test.go +++ b/test/integration/bench_cat_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/core/coreapi" mock "github.com/ipfs/go-ipfs/core/mock" "github.com/ipfs/go-ipfs/thirdparty/unit" @@ -83,10 +84,10 @@ func benchCat(b *testing.B, data []byte, conf testutil.LatencyConfig) error { bs1 := []pstore.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)} bs2 := []pstore.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)} - if err := catter.Bootstrap(core.BootstrapConfigWithPeers(bs1)); err != nil { + if err := catter.Bootstrap(bootstrap.BootstrapConfigWithPeers(bs1)); err != nil { return err } - if err := adder.Bootstrap(core.BootstrapConfigWithPeers(bs2)); err != nil { + if err := adder.Bootstrap(bootstrap.BootstrapConfigWithPeers(bs2)); err != nil { return err } diff --git a/test/integration/bitswap_wo_routing_test.go b/test/integration/bitswap_wo_routing_test.go index af440663353..30b8ce30d58 100644 --- a/test/integration/bitswap_wo_routing_test.go +++ b/test/integration/bitswap_wo_routing_test.go @@ -8,6 +8,7 @@ import ( "github.com/ipfs/go-block-format" "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core/mock" + "github.com/ipfs/go-ipfs/core/node" cid "github.com/ipfs/go-cid" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -26,7 +27,7 @@ func TestBitswapWithoutRouting(t *testing.T) { n, err := core.NewNode(ctx, &core.BuildCfg{ Online: true, Host: coremock.MockHostOption(mn), - Routing: core.NilRouterOption, // no routing + Routing: node.NilRouterOption, // no routing }) if err != nil { t.Fatal(err) diff --git a/test/integration/three_legged_cat_test.go b/test/integration/three_legged_cat_test.go index 953c7e37096..1fc0e7bf253 100644 --- a/test/integration/three_legged_cat_test.go +++ b/test/integration/three_legged_cat_test.go @@ -10,6 +10,7 @@ import ( "time" core "github.com/ipfs/go-ipfs/core" + bootstrap2 "github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/core/coreapi" mock "github.com/ipfs/go-ipfs/core/mock" "github.com/ipfs/go-ipfs/thirdparty/unit" @@ -118,7 +119,7 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error { } bis := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID()) - bcfg := core.BootstrapConfigWithPeers([]pstore.PeerInfo{bis}) + bcfg := bootstrap2.BootstrapConfigWithPeers([]pstore.PeerInfo{bis}) if err := adder.Bootstrap(bcfg); err != nil { return err }