diff --git a/engine/linksystem.go b/engine/linksystem.go index f51a9b5..a34903c 100644 --- a/engine/linksystem.go +++ b/engine/linksystem.go @@ -25,7 +25,9 @@ var ( // Creates the main engine linksystem. func (e *Engine) mkLinkSystem() ipld.LinkSystem { lsys := cidlink.DefaultLinkSystem() - lsys.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { + + storageReadOpener := func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { + // If link corresponds to schema.NoEntries return error immediately. if lnk == schema.NoEntries { return nil, errNoEntries @@ -149,6 +151,14 @@ func (e *Engine) mkLinkSystem() ipld.LinkSystem { return bytes.NewBuffer(val), nil } + + lsys.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { + r, err := storageReadOpener(lctx, lnk) + if err != nil && e.options.storageReadOpenerErrorHook != nil { + return r, e.options.storageReadOpenerErrorHook(lctx, lnk, err) + } + return r, err + } lsys.StorageWriteOpener = func(lctx ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { buf := bytes.NewBuffer(nil) return buf, func(lnk ipld.Link) error { diff --git a/engine/options.go b/engine/options.go index 20d6b7f..e0a45a5 100644 --- a/engine/options.go +++ b/engine/options.go @@ -7,6 +7,7 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer/v2" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" + "github.com/ipld/go-ipld-prime" _ "github.com/ipni/go-libipni/maurl" "github.com/ipni/index-provider/engine/chunker" "github.com/ipni/index-provider/engine/policy" @@ -102,6 +103,8 @@ type ( chunker chunker.NewChunkerFunc syncPolicy *policy.Policy + + storageReadOpenerErrorHook func(lctx ipld.LinkContext, lnk ipld.Link, err error) error } ) @@ -442,3 +445,13 @@ func WithExtraGossipData(extraData []byte) Option { return nil } } + +// WithStorageReadOpenerErrorHook allows the calling applicaiton to invoke a custom piece logic whenever a storage read opener error occurs. +// For example the calling application can delete corrupted / create a new advertisement if the datastore was corrupted for some reason. +// The calling application can return ipld.ErrNotFound{} to indicate IPNI that this advertisement should be skipped without halting processing of the rest of the chain. +func WithStorageReadOpenerErrorHook(hook func(ipld.LinkContext, ipld.Link, error) error) Option { + return func(o *options) error { + o.storageReadOpenerErrorHook = hook + return nil + } +}