diff --git a/go.sum b/go.sum index 2b30d8fc..8a1d98df 100644 --- a/go.sum +++ b/go.sum @@ -149,8 +149,6 @@ github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJ github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M= github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c h1:lN5IQA07VtLiTLAp/Scezp1ljFhXErC6yq4O1cu+yJ0= github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I= -github.com/ipfs/go-car v0.0.4 h1:zLhxykvk4SFU4oIpgcIoiolVL3jqcK0hjqcQfUSs4dk= -github.com/ipfs/go-car v0.0.4/go.mod h1:eZX0EppfsvSQN8IsJnx57bheogWMgQjJVWU/fDA7ySQ= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= @@ -231,7 +229,6 @@ github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb h1:tmWYgjltxwM7PD github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k= github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E= github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0= -github.com/ipld/go-car v0.0.4/go.mod h1:kgp82Cpd0TgoxgCsr7yndUXMkMgNP/aAwVBtJtsNR6g= github.com/ipld/go-car v0.0.5-0.20200316204026-3e2cf7af0fab h1:+3Y6Jb3IBmG3t6e3r6TItnuciOaMOuGW7QIVEUa5vy4= github.com/ipld/go-car v0.0.5-0.20200316204026-3e2cf7af0fab/go.mod h1:yR5AsJ38xTwwgwGpbh60ICtdLPp5lGfuH28PAAzaEhM= github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785 h1:fASnkvtR+SmB2y453RxmDD3Uvd4LonVUgFGk9JoDaZs= diff --git a/shared_testutil/test_filestore.go b/shared_testutil/test_filestore.go new file mode 100644 index 00000000..c0a0f851 --- /dev/null +++ b/shared_testutil/test_filestore.go @@ -0,0 +1,176 @@ +package shared_testutil + +import ( + "bytes" + "errors" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-fil-markets/filestore" +) + +var TestErrNotFound = errors.New("file not found") +var TestErrTempFile = errors.New("temp file creation failed") + +// TestFileStoreParams are parameters for a test file store +type TestFileStoreParams struct { + Files []filestore.File + AvailableTempFiles []filestore.File + ExpectedDeletions []filestore.Path + ExpectedOpens []filestore.Path +} + +// TestFileStore is a mocked file store that can provide programmed returns +// and test expectations +type TestFileStore struct { + files []filestore.File + availableTempFiles []filestore.File + expectedDeletions map[filestore.Path]struct{} + expectedOpens map[filestore.Path]struct{} + deletedFiles map[filestore.Path]struct{} + openedFiles map[filestore.Path]struct{} +} + +// NewTestFileStore returns a new test file store from the given parameters +func NewTestFileStore(params TestFileStoreParams) *TestFileStore { + fs := &TestFileStore{ + files: params.Files, + availableTempFiles: params.AvailableTempFiles, + expectedDeletions: make(map[filestore.Path]struct{}), + expectedOpens: make(map[filestore.Path]struct{}), + deletedFiles: make(map[filestore.Path]struct{}), + openedFiles: make(map[filestore.Path]struct{}), + } + for _, path := range params.ExpectedDeletions { + fs.expectedDeletions[path] = struct{}{} + } + for _, path := range params.ExpectedOpens { + fs.expectedOpens[path] = struct{}{} + } + return fs +} + +// Open will open a file if it's in the file store +func (fs *TestFileStore) Open(p filestore.Path) (filestore.File, error) { + var foundFile filestore.File + for _, file := range fs.files { + if p == file.Path() { + foundFile = file + break + } + } + if foundFile == nil { + return nil, TestErrNotFound + } + fs.openedFiles[p] = struct{}{} + return foundFile, nil +} + +// Create is not implement +func (fs *TestFileStore) Create(p filestore.Path) (filestore.File, error) { + panic("not implemented") +} + +// Store is not implemented +func (fs *TestFileStore) Store(p filestore.Path, f filestore.File) (filestore.Path, error) { + panic("not implemented") +} + +// Delete will delete a file if it is in the file store +func (fs *TestFileStore) Delete(p filestore.Path) error { + var foundFile filestore.File + for i, file := range fs.files { + if p == file.Path() { + foundFile = file + fs.files[i] = fs.files[len(fs.files)-1] + fs.files[len(fs.files)-1] = nil + fs.files = fs.files[:len(fs.files)-1] + break + } + } + if foundFile == nil { + return TestErrNotFound + } + fs.deletedFiles[p] = struct{}{} + return nil +} + +// CreateTemp will create a temporary file from the provided set of temporary files +func (fs *TestFileStore) CreateTemp() (filestore.File, error) { + if len(fs.availableTempFiles) == 0 { + return nil, TestErrTempFile + } + var tempFile filestore.File + tempFile, fs.availableTempFiles = fs.availableTempFiles[0], fs.availableTempFiles[1:] + fs.files = append(fs.files, tempFile) + return tempFile, nil +} + +// VerifyExpectations will verify that the correct files were opened and deleted +func (fs *TestFileStore) VerifyExpectations(t *testing.T) { + require.Equal(t, fs.openedFiles, fs.expectedOpens) + require.Equal(t, fs.deletedFiles, fs.expectedDeletions) +} + +// TestFileParams are parameters for a test file +type TestFileParams struct { + Buffer *bytes.Buffer + Size int64 + Path filestore.Path +} + +// NewTestFile generates a mocked filestore.File that has programmed returns +func NewTestFile(params TestFileParams) *TestFile { + tf := &TestFile{ + Buffer: params.Buffer, + size: params.Size, + path: params.Path, + } + if tf.Buffer == nil { + tf.Buffer = new(bytes.Buffer) + } + if tf.size == 0 { + tf.size = rand.Int63() + } + if tf.path == filestore.Path("") { + buf := make([]byte, 16) + _, _ = rand.Read(buf) + tf.path = filestore.Path(buf) + } + return tf +} + +// TestFile is a mocked version of filestore.File with preset returns +// and a byte buffer for read/writes +type TestFile struct { + *bytes.Buffer + size int64 + path filestore.Path +} + +// Path returns the preset path +func (f *TestFile) Path() filestore.Path { + return f.path +} + +// OsPath is not implemented +func (f *TestFile) OsPath() filestore.OsPath { + panic("not implemented") +} + +// Size returns the preset size +func (f *TestFile) Size() int64 { + return f.size +} + +// Close does nothing +func (f *TestFile) Close() error { + return nil +} + +// Seek is not implemented +func (f *TestFile) Seek(offset int64, whence int) (int64, error) { + panic("not implemented") +} diff --git a/shared_testutil/test_ipld_tree.go b/shared_testutil/test_ipld_tree.go index b368b2e1..84f47211 100644 --- a/shared_testutil/test_ipld_tree.go +++ b/shared_testutil/test_ipld_tree.go @@ -8,6 +8,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + "github.com/ipld/go-car" "github.com/ipld/go-ipld-prime" // to register multicodec @@ -15,6 +16,8 @@ import ( "github.com/ipld/go-ipld-prime/fluent" ipldfree "github.com/ipld/go-ipld-prime/impl/free" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" ) // TestIPLDTree is a set of IPLD Data that forms a tree spread across some blocks @@ -130,3 +133,18 @@ func (tt TestIPLDTree) Get(c cid.Cid) (blocks.Block, error) { } return blocks.NewBlockWithCid(data, c) } + +// DumpToCar puts the tree into a car file, with user configured functions +func (tt TestIPLDTree) DumpToCar(out io.Writer, userOnNewCarBlocks ...car.OnNewCarBlockFunc) error { + ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) + node := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() + ctx := context.Background() + sc := car.NewSelectiveCar(ctx, tt, []car.Dag{ + car.Dag{ + Root: tt.RootNodeLnk.(cidlink.Link).Cid, + Selector: node, + }, + }) + + return sc.Write(out, userOnNewCarBlocks...) +} diff --git a/shared_testutil/test_piecestore.go b/shared_testutil/test_piecestore.go index 6a5b488b..c0dfd635 100644 --- a/shared_testutil/test_piecestore.go +++ b/shared_testutil/test_piecestore.go @@ -13,25 +13,40 @@ import ( // TestPieceStore is piecestore who's query results are mocked type TestPieceStore struct { - piecesStubbed map[cid.Cid]piecestore.PieceInfo - piecesExpected map[cid.Cid]struct{} - piecesReceived map[cid.Cid]struct{} - cidInfosStubbed map[cid.Cid]piecestore.CIDInfo - cidInfosExpected map[cid.Cid]struct{} - cidInfosReceived map[cid.Cid]struct{} + addPieceBlockLocationsError error + addDealForPieceError error + piecesStubbed map[cid.Cid]piecestore.PieceInfo + piecesExpected map[cid.Cid]struct{} + piecesReceived map[cid.Cid]struct{} + cidInfosStubbed map[cid.Cid]piecestore.CIDInfo + cidInfosExpected map[cid.Cid]struct{} + cidInfosReceived map[cid.Cid]struct{} +} + +// TestPieceStoreParams sets parameters for a piece store +type TestPieceStoreParams struct { + AddDealForPieceError error + AddPieceBlockLocationsError error } var _ piecestore.PieceStore = &TestPieceStore{} // NewTestPieceStore creates a TestPieceStore func NewTestPieceStore() *TestPieceStore { + return NewTestPieceStoreWithParams(TestPieceStoreParams{}) +} + +// NewTestPieceStoreWithParams creates a TestPieceStore with the given parameters +func NewTestPieceStoreWithParams(params TestPieceStoreParams) *TestPieceStore { return &TestPieceStore{ - piecesStubbed: make(map[cid.Cid]piecestore.PieceInfo), - piecesExpected: make(map[cid.Cid]struct{}), - piecesReceived: make(map[cid.Cid]struct{}), - cidInfosStubbed: make(map[cid.Cid]piecestore.CIDInfo), - cidInfosExpected: make(map[cid.Cid]struct{}), - cidInfosReceived: make(map[cid.Cid]struct{}), + addDealForPieceError: params.AddDealForPieceError, + addPieceBlockLocationsError: params.AddPieceBlockLocationsError, + piecesStubbed: make(map[cid.Cid]piecestore.PieceInfo), + piecesExpected: make(map[cid.Cid]struct{}), + piecesReceived: make(map[cid.Cid]struct{}), + cidInfosStubbed: make(map[cid.Cid]piecestore.CIDInfo), + cidInfosExpected: make(map[cid.Cid]struct{}), + cidInfosReceived: make(map[cid.Cid]struct{}), } } @@ -75,14 +90,17 @@ func (tps *TestPieceStore) VerifyExpectations(t *testing.T) { require.Equal(t, tps.cidInfosExpected, tps.cidInfosReceived) } +// AddDealForPiece returns a preprogrammed error func (tps *TestPieceStore) AddDealForPiece(pieceCID cid.Cid, dealInfo piecestore.DealInfo) error { - panic("not implemented") + return tps.addDealForPieceError } +// AddPieceBlockLocations returns a preprogrammed error func (tps *TestPieceStore) AddPieceBlockLocations(pieceCID cid.Cid, blockLocations map[cid.Cid]piecestore.BlockLocation) error { - panic("not implemented") + return tps.addPieceBlockLocationsError } +// GetPieceInfo returns a piece info if it's been stubbed func (tps *TestPieceStore) GetPieceInfo(pieceCID cid.Cid) (piecestore.PieceInfo, error) { tps.piecesReceived[pieceCID] = struct{}{} @@ -97,6 +115,7 @@ func (tps *TestPieceStore) GetPieceInfo(pieceCID cid.Cid) (piecestore.PieceInfo, return piecestore.PieceInfoUndefined, errors.New("GetPieceInfo failed") } +// GetCIDInfo returns cid info if it's been stubbed func (tps *TestPieceStore) GetCIDInfo(c cid.Cid) (piecestore.CIDInfo, error) { tps.cidInfosReceived[c] = struct{}{} diff --git a/storagemarket/impl/blockrecorder/blockrecorder_test.go b/storagemarket/impl/blockrecorder/blockrecorder_test.go index 7694d900..a3c24bc6 100644 --- a/storagemarket/impl/blockrecorder/blockrecorder_test.go +++ b/storagemarket/impl/blockrecorder/blockrecorder_test.go @@ -5,73 +5,50 @@ import ( "context" "testing" - format "github.com/ipfs/go-ipld-format" - dag "github.com/ipfs/go-merkledag" - dstest "github.com/ipfs/go-merkledag/test" + blocks "github.com/ipfs/go-block-format" "github.com/ipld/go-car" ipldfree "github.com/ipld/go-ipld-prime/impl/free" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" ) func TestBlockRecording(t *testing.T) { - - sourceBserv := dstest.Bserv() - sourceBs := sourceBserv.Blockstore() - - dserv := dag.NewDAGService(sourceBserv) - a := dag.NewRawNode([]byte("aaaa")) - b := dag.NewRawNode([]byte("bbbb")) - c := dag.NewRawNode([]byte("cccc")) - - nd1 := &dag.ProtoNode{} - _ = nd1.AddNodeLink("cat", a) - - nd2 := &dag.ProtoNode{} - _ = nd2.AddNodeLink("first", nd1) - _ = nd2.AddNodeLink("dog", b) - - nd3 := &dag.ProtoNode{} - _ = nd3.AddNodeLink("second", nd2) - _ = nd3.AddNodeLink("bear", c) - - ctx := context.Background() - _ = dserv.Add(ctx, a) - _ = dserv.Add(ctx, b) - _ = dserv.Add(ctx, c) - _ = dserv.Add(ctx, nd1) - _ = dserv.Add(ctx, nd2) - _ = dserv.Add(ctx, nd3) - + testData := shared_testutil.NewTestIPLDTree() ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) node := ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) { - efsb.Insert("Links", - ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())))) + efsb.Insert("linkedMap", + ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))) }).Node() - sc := car.NewSelectiveCar(ctx, sourceBs, []car.Dag{ + ctx := context.Background() + sc := car.NewSelectiveCar(ctx, testData, []car.Dag{ car.Dag{ - Root: nd3.Cid(), + Root: testData.RootNodeLnk.(cidlink.Link).Cid, Selector: node, }, }) carBuf := new(bytes.Buffer) blockLocationBuf := new(bytes.Buffer) - sc.Write(carBuf, blockrecorder.RecordEachBlockTo(blockLocationBuf)) + err := sc.Write(carBuf, blockrecorder.RecordEachBlockTo(blockLocationBuf)) + require.NoError(t, err) metadata, err := blockrecorder.ReadBlockMetadata(blockLocationBuf) require.NoError(t, err) - nds := []format.Node{ - a, b, nd1, nd2, nd3, + blks := []blocks.Block{ + testData.LeafAlphaBlock, + testData.MiddleMapBlock, + testData.RootBlock, } carBytes := carBuf.Bytes() - for _, nd := range nds { - cid := nd.Cid() + for _, blk := range blks { + cid := blk.Cid() var found bool var metadatum blockrecorder.PieceBlockMetadata for _, testMetadatum := range metadata { @@ -83,6 +60,21 @@ func TestBlockRecording(t *testing.T) { } require.True(t, found) testBuf := carBytes[metadatum.Offset : metadatum.Offset+metadatum.Size] - require.Equal(t, nd.RawData(), testBuf) + require.Equal(t, blk.RawData(), testBuf) + } + missingBlks := []blocks.Block{ + testData.LeafBetaBlock, + testData.MiddleListBlock, + } + for _, blk := range missingBlks { + cid := blk.Cid() + var found bool + for _, testMetadatum := range metadata { + if testMetadatum.CID.Equals(cid) { + found = true + break + } + } + require.False(t, found) } } diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index cb42f0c0..c6aba1b9 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -32,24 +32,38 @@ import ( var ProviderDsPrefix = "/deals/provider" var _ storagemarket.StorageProvider = &Provider{} +// Provider is a storage provider implementation type Provider struct { net network.StorageMarketNetwork proofType abi.RegisteredProof - spn storagemarket.StorageProviderNode - fs filestore.FileStore - pio pieceio.PieceIOWithStore - pieceStore piecestore.PieceStore - conns *connmanager.ConnManager - storedAsk *storedask.StoredAsk - actor address.Address - dataTransfer datatransfer.Manager + spn storagemarket.StorageProviderNode + fs filestore.FileStore + pio pieceio.PieceIOWithStore + pieceStore piecestore.PieceStore + conns *connmanager.ConnManager + storedAsk *storedask.StoredAsk + actor address.Address + dataTransfer datatransfer.Manager + universalRetrievalEnabled bool deals fsm.Group } -func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode, minerAddress address.Address, rt abi.RegisteredProof) (storagemarket.StorageProvider, error) { +// StorageProviderOption allows custom configuration of a storage provider +type StorageProviderOption func(p *Provider) + +// EnableUniversalRetrieval causes a storage provider to track all CIDs in a piece, +// so that any CID, not just the root, can be retrieved +func EnableUniversalRetrieval() StorageProviderOption { + return func(p *Provider) { + p.universalRetrievalEnabled = true + } +} + +// NewProvider returns a new storage provider +func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode, minerAddress address.Address, rt abi.RegisteredProof, options ...StorageProviderOption) (storagemarket.StorageProvider, error) { carIO := cario.NewCarIO() pio := pieceio.NewPieceIOWithStore(carIO, fs, bs) @@ -84,6 +98,10 @@ func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blo h.deals = deals + for _, option := range options { + option(h) + } + // register a data transfer event handler -- this will move deals from // accepted to staged dataTransfer.SubscribeToEvents(providerutils.DataTransferSubscriber(deals)) @@ -160,31 +178,41 @@ func (p *Provider) ImportDataForDeal(ctx context.Context, propCid cid.Cid, data if err != nil { return xerrors.Errorf("failed to create temp file for data import: %w", err) } + cleanup := func() { + _ = tempfi.Close() + _ = p.fs.Delete(tempfi.Path()) + } n, err := io.Copy(tempfi, data) if err != nil { + cleanup() return xerrors.Errorf("importing deal data failed: %w", err) } + _ = n // TODO: verify n? pieceSize := uint64(tempfi.Size()) _, err = tempfi.Seek(0, io.SeekStart) if err != nil { + cleanup() return xerrors.Errorf("failed to seek through temp imported file: %w", err) } pieceCid, _, err := pieceio.GeneratePieceCommitment(p.proofType, tempfi, pieceSize) if err != nil { + cleanup() return xerrors.Errorf("failed to generate commP") } // Verify CommP matches if !pieceCid.Equals(d.Proposal.PieceCID) { + cleanup() return xerrors.Errorf("given data does not match expected commP (got: %x, expected %x)", pieceCid, d.Proposal.PieceCID) } - return p.deals.Send(propCid, storagemarket.ProviderEventVerifiedData, tempfi.Path()) + return p.deals.Send(propCid, storagemarket.ProviderEventVerifiedData, tempfi.Path(), filestore.Path("")) + } func (p *Provider) ListAsks(addr address.Address) []*storagemarket.SignedStorageAsk { @@ -262,25 +290,20 @@ func (p *providerDealEnvironment) StartDataTransfer(ctx context.Context, to peer return err } -func (p *providerDealEnvironment) GeneratePieceCommitmentToFile(payloadCid cid.Cid, selector ipld.Node) (cid.Cid, filestore.Path, error) { - pieceCid, path, _, err := p.p.pio.GeneratePieceCommitmentToFile(p.p.proofType, payloadCid, selector) - return pieceCid, path, err -} - -func (p *providerDealEnvironment) OpenFile(path filestore.Path) (filestore.File, error) { - return p.p.fs.Open(path) -} - -func (p *providerDealEnvironment) DeleteFile(path filestore.Path) error { - return p.p.fs.Delete(path) +func (p *providerDealEnvironment) GeneratePieceCommitmentToFile(payloadCid cid.Cid, selector ipld.Node) (cid.Cid, filestore.Path, filestore.Path, error) { + if p.p.universalRetrievalEnabled { + return providerutils.GeneratePieceCommitmentWithMetadata(p.p.fs, p.p.pio.GeneratePieceCommitmentToFile, p.p.proofType, payloadCid, selector) + } + pieceCid, piecePath, _, err := p.p.pio.GeneratePieceCommitmentToFile(p.p.proofType, payloadCid, selector) + return pieceCid, piecePath, filestore.Path(""), err } -func (p *providerDealEnvironment) AddDealForPiece(pieceCID cid.Cid, dealInfo piecestore.DealInfo) error { - return p.p.pieceStore.AddDealForPiece(pieceCID, dealInfo) +func (p *providerDealEnvironment) FileStore() filestore.FileStore { + return p.p.fs } -func (p *providerDealEnvironment) AddPieceBlockLocations(pieceCID cid.Cid, blockLocations map[cid.Cid]piecestore.BlockLocation) error { - return p.p.pieceStore.AddPieceBlockLocations(pieceCID, blockLocations) +func (p *providerDealEnvironment) PieceStore() piecestore.PieceStore { + return p.p.pieceStore } func (p *providerDealEnvironment) SendSignedResponse(ctx context.Context, resp *network.Response) error { @@ -315,3 +338,5 @@ func (p *providerDealEnvironment) SendSignedResponse(ctx context.Context, resp * func (p *providerDealEnvironment) Disconnect(proposalCid cid.Cid) error { return p.p.conns.Disconnect(proposalCid) } + +var _ providerstates.ProviderDealEnvironment = &providerDealEnvironment{} diff --git a/storagemarket/impl/providerstates/provider_fsm.go b/storagemarket/impl/providerstates/provider_fsm.go index 7c471b25..e908ba60 100644 --- a/storagemarket/impl/providerstates/provider_fsm.go +++ b/storagemarket/impl/providerstates/provider_fsm.go @@ -45,8 +45,9 @@ var ProviderEvents = fsm.Events{ }), fsm.Event(storagemarket.ProviderEventVerifiedData). FromMany(storagemarket.StorageDealVerifyData, storagemarket.StorageDealWaitingForData).To(storagemarket.StorageDealPublishing). - Action(func(deal *storagemarket.MinerDeal, path filestore.Path) error { + Action(func(deal *storagemarket.MinerDeal, path filestore.Path, metadataPath filestore.Path) error { deal.PiecePath = path + deal.MetadataPath = metadataPath return nil }), fsm.Event(storagemarket.ProviderEventSendResponseFailed). @@ -58,39 +59,45 @@ var ProviderEvents = fsm.Events{ fsm.Event(storagemarket.ProviderEventDealPublished). From(storagemarket.StorageDealPublishing).To(storagemarket.StorageDealStaged). Action(func(deal *storagemarket.MinerDeal, dealID abi.DealID) error { + deal.ConnectionClosed = true deal.DealID = dealID return nil }), fsm.Event(storagemarket.ProviderEventFileStoreErrored). - FromMany(storagemarket.StorageDealStaged, storagemarket.StorageDealSealing, storagemarket.StorageDealActive).To(storagemarket.StorageDealError). + FromMany(storagemarket.StorageDealStaged, storagemarket.StorageDealSealing, storagemarket.StorageDealActive).To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.MinerDeal, err error) error { deal.Message = xerrors.Errorf("accessing file store: %w", err).Error() return nil }), - fsm.Event(storagemarket.ProviderEventDealHandoffFailed).From(storagemarket.StorageDealStaged).To(storagemarket.StorageDealError). + fsm.Event(storagemarket.ProviderEventDealHandoffFailed).From(storagemarket.StorageDealStaged).To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.MinerDeal, err error) error { deal.Message = xerrors.Errorf("handing off deal to node: %w", err).Error() return nil }), fsm.Event(storagemarket.ProviderEventDealHandedOff).From(storagemarket.StorageDealStaged).To(storagemarket.StorageDealSealing), fsm.Event(storagemarket.ProviderEventDealActivationFailed). - From(storagemarket.StorageDealSealing).To(storagemarket.StorageDealError). + From(storagemarket.StorageDealSealing).To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.MinerDeal, err error) error { deal.Message = xerrors.Errorf("error activating deal: %w", err).Error() return nil }), fsm.Event(storagemarket.ProviderEventDealActivated).From(storagemarket.StorageDealSealing).To(storagemarket.StorageDealActive), - fsm.Event(storagemarket.ProviderEventPieceStoreErrored).From(storagemarket.StorageDealActive).To(storagemarket.StorageDealError). + fsm.Event(storagemarket.ProviderEventPieceStoreErrored).From(storagemarket.StorageDealActive).To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.MinerDeal, err error) error { deal.Message = xerrors.Errorf("accessing piece store: %w", err).Error() return nil }), fsm.Event(storagemarket.ProviderEventDealCompleted).From(storagemarket.StorageDealActive).To(storagemarket.StorageDealCompleted), - fsm.Event(storagemarket.ProviderEventUnableToLocatePiece).From(storagemarket.StorageDealActive).To(storagemarket.StorageDealError). + fsm.Event(storagemarket.ProviderEventUnableToLocatePiece).From(storagemarket.StorageDealActive).To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.MinerDeal, dealID abi.DealID, err error) error { deal.Message = xerrors.Errorf("locating piece for deal ID %d in sector: %w", deal.DealID, err).Error() return nil }), + fsm.Event(storagemarket.ProviderEventReadMetadataErrored).From(storagemarket.StorageDealActive).To(storagemarket.StorageDealFailing). + Action(func(deal *storagemarket.MinerDeal, err error) error { + deal.Message = xerrors.Errorf("error reading piece metadata: %w", err).Error() + return nil + }), fsm.Event(storagemarket.ProviderEventFailed).From(storagemarket.StorageDealFailing).To(storagemarket.StorageDealError), } diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index 9384cd53..e15a6356 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -35,13 +35,11 @@ type ProviderDealEnvironment interface { Node() storagemarket.StorageProviderNode Ask() storagemarket.StorageAsk StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error - GeneratePieceCommitmentToFile(payloadCid cid.Cid, selector ipld.Node) (cid.Cid, filestore.Path, error) + GeneratePieceCommitmentToFile(payloadCid cid.Cid, selector ipld.Node) (cid.Cid, filestore.Path, filestore.Path, error) SendSignedResponse(ctx context.Context, response *network.Response) error Disconnect(proposalCid cid.Cid) error - OpenFile(path filestore.Path) (filestore.File, error) - DeleteFile(path filestore.Path) error - AddDealForPiece(pieceCID cid.Cid, dealInfo piecestore.DealInfo) error - AddPieceBlockLocations(pieceCID cid.Cid, blockLocations map[cid.Cid]piecestore.BlockLocation) error + FileStore() filestore.FileStore + PieceStore() piecestore.PieceStore } // ProviderStateEntryFunc is the signature for a StateEntryFunc in the provider FSM @@ -139,7 +137,7 @@ func VerifyData(ctx fsm.Context, environment ProviderDealEnvironment, deal stora allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() - pieceCid, path, err := environment.GeneratePieceCommitmentToFile(deal.Ref.Root, allSelector) + pieceCid, piecePath, metadataPath, err := environment.GeneratePieceCommitmentToFile(deal.Ref.Root, allSelector) if err != nil { return ctx.Trigger(storagemarket.ProviderEventGeneratePieceCIDFailed, err) } @@ -149,7 +147,7 @@ func VerifyData(ctx fsm.Context, environment ProviderDealEnvironment, deal stora return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("proposal CommP doesn't match calculated CommP")) } - return ctx.Trigger(storagemarket.ProviderEventVerifiedData, path) + return ctx.Trigger(storagemarket.ProviderEventVerifiedData, piecePath, metadataPath) } // PublishDeal publishes a deal on chain and sends the deal id back to the client @@ -202,7 +200,7 @@ func PublishDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor // HandoffDeal hands off a published deal for sealing and commitment in a sector func HandoffDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { - file, err := environment.OpenFile(deal.PiecePath) + file, err := environment.FileStore().Open(deal.PiecePath) if err != nil { return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err)) } @@ -249,24 +247,31 @@ func VerifyDealActivated(ctx fsm.Context, environment ProviderDealEnvironment, d // RecordPieceInfo records sector information about an activated deal so that the data // can be retrieved later func RecordPieceInfo(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { - err := environment.DeleteFile(deal.PiecePath) - if err != nil { - return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("deleting piece at path %s: %w", deal.PiecePath, err)) - } sectorID, offset, length, err := environment.Node().LocatePieceForDealWithinSector(ctx.Context(), deal.DealID) if err != nil { return ctx.Trigger(storagemarket.ProviderEventUnableToLocatePiece, deal.DealID, err) } + + var blockLocations map[cid.Cid]piecestore.BlockLocation + if deal.MetadataPath != filestore.Path("") { + blockLocations, err = providerutils.LoadBlockLocations(environment.FileStore(), deal.MetadataPath) + if err != nil { + return ctx.Trigger(storagemarket.ProviderEventReadMetadataErrored, err) + } + } else { + blockLocations = map[cid.Cid]piecestore.BlockLocation{ + deal.Ref.Root: {}, + } + } + // TODO: Record actual block locations for all CIDs in piece by improving car writing - err = environment.AddPieceBlockLocations(deal.Proposal.PieceCID, map[cid.Cid]piecestore.BlockLocation{ - deal.Ref.Root: {}, - }) + err = environment.PieceStore().AddPieceBlockLocations(deal.Proposal.PieceCID, blockLocations) if err != nil { return ctx.Trigger(storagemarket.ProviderEventPieceStoreErrored, xerrors.Errorf("adding piece block locations: %w", err)) } - err = environment.AddDealForPiece(deal.Proposal.PieceCID, piecestore.DealInfo{ + err = environment.PieceStore().AddDealForPiece(deal.Proposal.PieceCID, piecestore.DealInfo{ DealID: deal.DealID, SectorID: sectorID, Offset: offset, @@ -277,6 +282,17 @@ func RecordPieceInfo(ctx fsm.Context, environment ProviderDealEnvironment, deal return ctx.Trigger(storagemarket.ProviderEventPieceStoreErrored, xerrors.Errorf("adding deal info for piece: %w", err)) } + err = environment.FileStore().Delete(deal.PiecePath) + if err != nil { + log.Warnf("deleting piece at path %s: %w", deal.PiecePath, err) + } + if deal.MetadataPath != filestore.Path("") { + err := environment.FileStore().Delete(deal.MetadataPath) + if err != nil { + log.Warnf("deleting piece at path %s: %w", deal.MetadataPath, err) + } + } + return ctx.Trigger(storagemarket.ProviderEventDealCompleted) } @@ -285,19 +301,33 @@ func FailDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storage log.Warnf("deal %s failed: %s", deal.ProposalCid, deal.Message) - err := environment.SendSignedResponse(ctx.Context(), &network.Response{ - State: storagemarket.StorageDealFailing, - Message: deal.Message, - Proposal: deal.ProposalCid, - }) + if !deal.ConnectionClosed { + err := environment.SendSignedResponse(ctx.Context(), &network.Response{ + State: storagemarket.StorageDealFailing, + Message: deal.Message, + Proposal: deal.ProposalCid, + }) - if err != nil { - return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) - } + if err != nil { + return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) + } - if err := environment.Disconnect(deal.ProposalCid); err != nil { - log.Warnf("closing client connection: %+v", err) + if err := environment.Disconnect(deal.ProposalCid); err != nil { + log.Warnf("closing client connection: %+v", err) + } } + if deal.PiecePath != filestore.Path("") { + err := environment.FileStore().Delete(deal.PiecePath) + if err != nil { + log.Warnf("deleting piece at path %s: %w", deal.PiecePath, err) + } + } + if deal.MetadataPath != filestore.Path("") { + err := environment.FileStore().Delete(deal.MetadataPath) + if err != nil { + log.Warnf("deleting piece at path %s: %w", deal.MetadataPath, err) + } + } return ctx.Trigger(storagemarket.ProviderEventFailed) } diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index aae53347..4610e170 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -1,8 +1,10 @@ package providerstates_test import ( + "bytes" "context" "errors" + "fmt" "math/rand" "testing" @@ -22,6 +24,7 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" tut "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" @@ -38,6 +41,8 @@ func TestValidateDealProposal(t *testing.T) { nodeParams nodeParams dealParams dealParams environmentParams environmentParams + fileStoreParams tut.TestFileStoreParams + pieceStoreParams tut.TestPieceStoreParams dealInspector func(t *testing.T, deal storagemarket.MinerDeal) }{ "succeeds": { @@ -120,7 +125,7 @@ func TestValidateDealProposal(t *testing.T) { } for test, data := range tests { t.Run(test, func(t *testing.T) { - runValidateDealProposal(t, data.nodeParams, data.environmentParams, data.dealParams, data.dealInspector) + runValidateDealProposal(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) }) } } @@ -134,6 +139,8 @@ func TestTransferData(t *testing.T) { nodeParams nodeParams dealParams dealParams environmentParams environmentParams + fileStoreParams tut.TestFileStoreParams + pieceStoreParams tut.TestPieceStoreParams dealInspector func(t *testing.T, deal storagemarket.MinerDeal) }{ "succeeds": { @@ -163,7 +170,7 @@ func TestTransferData(t *testing.T) { } for test, data := range tests { t.Run(test, func(t *testing.T) { - runTransferData(t, data.nodeParams, data.environmentParams, data.dealParams, data.dealInspector) + runTransferData(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) }) } } @@ -173,20 +180,25 @@ func TestVerifyData(t *testing.T) { eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) require.NoError(t, err) expPath := filestore.Path("applesauce.txt") + expMetaPath := filestore.Path("somemetadata.txt") runVerifyData := makeExecutor(ctx, eventProcessor, providerstates.VerifyData, storagemarket.StorageDealVerifyData) tests := map[string]struct { nodeParams nodeParams dealParams dealParams environmentParams environmentParams + fileStoreParams tut.TestFileStoreParams + pieceStoreParams tut.TestPieceStoreParams dealInspector func(t *testing.T, deal storagemarket.MinerDeal) }{ "succeeds": { environmentParams: environmentParams{ - Path: expPath, + Path: expPath, + MetadataPath: expMetaPath, }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { require.Equal(t, deal.State, storagemarket.StorageDealPublishing) require.Equal(t, deal.PiecePath, expPath) + require.Equal(t, deal.MetadataPath, expMetaPath) }, }, "generate piece CID fails": { @@ -210,7 +222,7 @@ func TestVerifyData(t *testing.T) { } for test, data := range tests { t.Run(test, func(t *testing.T) { - runVerifyData(t, data.nodeParams, data.environmentParams, data.dealParams, data.dealInspector) + runVerifyData(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) }) } } @@ -225,6 +237,8 @@ func TestPublishDeal(t *testing.T) { nodeParams nodeParams dealParams dealParams environmentParams environmentParams + fileStoreParams tut.TestFileStoreParams + pieceStoreParams tut.TestPieceStoreParams dealInspector func(t *testing.T, deal storagemarket.MinerDeal) }{ "succeeds": { @@ -234,6 +248,7 @@ func TestPublishDeal(t *testing.T) { dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { require.Equal(t, deal.State, storagemarket.StorageDealStaged) require.Equal(t, deal.DealID, expDealID) + require.Equal(t, deal.ConnectionClosed, true) }, }, "get miner worker fails": { @@ -275,7 +290,7 @@ func TestPublishDeal(t *testing.T) { } for test, data := range tests { t.Run(test, func(t *testing.T) { - runPublishDeal(t, data.nodeParams, data.environmentParams, data.dealParams, data.dealInspector) + runPublishDeal(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) }) } } @@ -289,9 +304,18 @@ func TestHandoffDeal(t *testing.T) { nodeParams nodeParams dealParams dealParams environmentParams environmentParams + fileStoreParams tut.TestFileStoreParams + pieceStoreParams tut.TestPieceStoreParams dealInspector func(t *testing.T, deal storagemarket.MinerDeal) }{ "succeeds": { + dealParams: dealParams{ + PiecePath: defaultPath, + }, + fileStoreParams: tut.TestFileStoreParams{ + Files: []filestore.File{defaultDataFile}, + ExpectedOpens: []filestore.Path{defaultPath}, + }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { require.Equal(t, deal.State, storagemarket.StorageDealSealing) }, @@ -300,27 +324,31 @@ func TestHandoffDeal(t *testing.T) { dealParams: dealParams{ PiecePath: filestore.Path("missing.txt"), }, - environmentParams: environmentParams{ - OpenFileError: errors.New("file not found"), - }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) - require.Equal(t, deal.Message, "accessing file store: reading piece at path missing.txt: file not found") + require.Equal(t, deal.State, storagemarket.StorageDealFailing) + require.Equal(t, deal.Message, fmt.Sprintf("accessing file store: reading piece at path missing.txt: %s", tut.TestErrNotFound.Error())) }, }, "OnDealComplete errors": { + dealParams: dealParams{ + PiecePath: defaultPath, + }, + fileStoreParams: tut.TestFileStoreParams{ + Files: []filestore.File{defaultDataFile}, + ExpectedOpens: []filestore.Path{defaultPath}, + }, nodeParams: nodeParams{ OnDealCompleteError: errors.New("failed building sector"), }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) + require.Equal(t, deal.State, storagemarket.StorageDealFailing) require.Equal(t, deal.Message, "handing off deal to node: failed building sector") }, }, } for test, data := range tests { t.Run(test, func(t *testing.T) { - runHandoffDeal(t, data.nodeParams, data.environmentParams, data.dealParams, data.dealInspector) + runHandoffDeal(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) }) } } @@ -334,6 +362,8 @@ func TestVerifyDealActivated(t *testing.T) { nodeParams nodeParams dealParams dealParams environmentParams environmentParams + fileStoreParams tut.TestFileStoreParams + pieceStoreParams tut.TestPieceStoreParams dealInspector func(t *testing.T, deal storagemarket.MinerDeal) }{ "succeeds": { @@ -346,7 +376,7 @@ func TestVerifyDealActivated(t *testing.T) { DealCommittedSyncError: errors.New("couldn't check deal commitment"), }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) + require.Equal(t, deal.State, storagemarket.StorageDealFailing) require.Equal(t, deal.Message, "error activating deal: couldn't check deal commitment") }, }, @@ -355,14 +385,14 @@ func TestVerifyDealActivated(t *testing.T) { DealCommittedAsyncError: errors.New("deal did not appear on chain"), }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) + require.Equal(t, deal.State, storagemarket.StorageDealFailing) require.Equal(t, deal.Message, "error activating deal: deal did not appear on chain") }, }, } for test, data := range tests { t.Run(test, func(t *testing.T) { - runVerifyDealActivated(t, data.nodeParams, data.environmentParams, data.dealParams, data.dealInspector) + runVerifyDealActivated(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) }) } } @@ -376,23 +406,34 @@ func TestRecordPieceInfo(t *testing.T) { nodeParams nodeParams dealParams dealParams environmentParams environmentParams + fileStoreParams tut.TestFileStoreParams + pieceStoreParams tut.TestPieceStoreParams dealInspector func(t *testing.T, deal storagemarket.MinerDeal) }{ "succeeds": { + dealParams: dealParams{ + PiecePath: defaultPath, + }, + fileStoreParams: tut.TestFileStoreParams{ + Files: []filestore.File{defaultDataFile}, + ExpectedDeletions: []filestore.Path{defaultPath}, + }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { require.Equal(t, deal.State, storagemarket.StorageDealCompleted) }, }, - "file deletion errors": { + "succeeds w metadata": { dealParams: dealParams{ - PiecePath: filestore.Path("readonly.txt"), + PiecePath: defaultPath, + MetadataPath: defaultMetadataPath, }, - environmentParams: environmentParams{ - DeleteFileError: errors.New("file is read only"), + fileStoreParams: tut.TestFileStoreParams{ + Files: []filestore.File{defaultDataFile, defaultMetadataFile}, + ExpectedOpens: []filestore.Path{defaultMetadataPath}, + ExpectedDeletions: []filestore.Path{defaultMetadataPath, defaultPath}, }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) - require.Equal(t, deal.Message, "accessing file store: deleting piece at path readonly.txt: file is read only") + require.Equal(t, deal.State, storagemarket.StorageDealCompleted) }, }, "locate piece fails": { @@ -403,32 +444,41 @@ func TestRecordPieceInfo(t *testing.T) { LocatePieceForDealWithinSectorError: errors.New("could not find piece"), }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) + require.Equal(t, deal.State, storagemarket.StorageDealFailing) require.Equal(t, deal.Message, "locating piece for deal ID 1234 in sector: could not find piece") }, }, + "reading metadata fails": { + dealParams: dealParams{ + MetadataPath: filestore.Path("Missing.txt"), + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { + require.Equal(t, deal.State, storagemarket.StorageDealFailing) + require.Equal(t, deal.Message, fmt.Sprintf("error reading piece metadata: %s", tut.TestErrNotFound.Error())) + }, + }, "add piece block locations errors": { - environmentParams: environmentParams{ + pieceStoreParams: tut.TestPieceStoreParams{ AddPieceBlockLocationsError: errors.New("could not add block locations"), }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) + require.Equal(t, deal.State, storagemarket.StorageDealFailing) require.Equal(t, deal.Message, "accessing piece store: adding piece block locations: could not add block locations") }, }, "add deal for piece errors": { - environmentParams: environmentParams{ + pieceStoreParams: tut.TestPieceStoreParams{ AddDealForPieceError: errors.New("could not add deal info"), }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) + require.Equal(t, deal.State, storagemarket.StorageDealFailing) require.Equal(t, deal.Message, "accessing piece store: adding deal info for piece: could not add deal info") }, }, } for test, data := range tests { t.Run(test, func(t *testing.T) { - runRecordPieceInfo(t, data.nodeParams, data.environmentParams, data.dealParams, data.dealInspector) + runRecordPieceInfo(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) }) } } @@ -442,6 +492,8 @@ func TestFailDeal(t *testing.T) { nodeParams nodeParams dealParams dealParams environmentParams environmentParams + fileStoreParams tut.TestFileStoreParams + pieceStoreParams tut.TestPieceStoreParams dealInspector func(t *testing.T, deal storagemarket.MinerDeal) }{ "succeeds": { @@ -449,6 +501,34 @@ func TestFailDeal(t *testing.T) { require.Equal(t, deal.State, storagemarket.StorageDealError) }, }, + "succeeds, skips response": { + environmentParams: environmentParams{ + // no send response should happen, so this error should not prevent + // success + SendSignedResponseError: errors.New("could not send"), + }, + dealParams: dealParams{ + ConnectionClosed: true, + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { + require.Equal(t, deal.State, storagemarket.StorageDealError) + // should not have additional error message + require.Equal(t, deal.Message, "") + }, + }, + "succeeds, file deletions": { + dealParams: dealParams{ + PiecePath: defaultPath, + MetadataPath: defaultMetadataPath, + }, + fileStoreParams: tut.TestFileStoreParams{ + Files: []filestore.File{defaultDataFile, defaultMetadataFile}, + ExpectedDeletions: []filestore.Path{defaultPath, defaultMetadataPath}, + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { + require.Equal(t, deal.State, storagemarket.StorageDealError) + }, + }, "SendSignedResponse errors": { environmentParams: environmentParams{ SendSignedResponseError: errors.New("could not send"), @@ -461,7 +541,7 @@ func TestFailDeal(t *testing.T) { } for test, data := range tests { t.Run(test, func(t *testing.T) { - runFailDeal(t, data.nodeParams, data.environmentParams, data.dealParams, data.dealInspector) + runFailDeal(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) }) } } @@ -475,6 +555,7 @@ var defaultStartEpoch = abi.ChainEpoch(200) var defaultEndEpoch = abi.ChainEpoch(400) var defaultPieceCid = tut.GenerateCids(1)[0] var defaultPath = filestore.Path("file.txt") +var defaultMetadataPath = filestore.Path("metadataPath.txt") var defaultClientAddress = address.TestAddress var defaultProviderAddress = address.TestAddress2 var defaultMinerAddr, _ = address.NewActorAddress([]byte("miner")) @@ -491,6 +572,21 @@ var defaultAsk = storagemarket.StorageAsk{ MinPieceSize: abi.PaddedPieceSize(256), } +var testData = tut.NewTestIPLDTree() +var dataBuf = new(bytes.Buffer) +var blockLocationBuf = new(bytes.Buffer) +var _ error = testData.DumpToCar(dataBuf, blockrecorder.RecordEachBlockTo(blockLocationBuf)) +var defaultDataFile = tut.NewTestFile(tut.TestFileParams{ + Buffer: dataBuf, + Path: defaultPath, + Size: 400, +}) +var defaultMetadataFile = tut.NewTestFile(tut.TestFileParams{ + Buffer: blockLocationBuf, + Path: defaultMetadataPath, + Size: 400, +}) + type nodeParams struct { MinerAddr address.Address MinerWorkerError error @@ -513,6 +609,8 @@ type nodeParams struct { type dealParams struct { PiecePath filestore.Path + MetadataPath filestore.Path + ConnectionClosed bool DealID abi.DealID DataRef *storagemarket.DataRef StoragePricePerEpoch abi.TokenAmount @@ -522,25 +620,23 @@ type dealParams struct { } type environmentParams struct { - Address address.Address - Ask storagemarket.StorageAsk - DataTransferError error - PieceCid cid.Cid - Path filestore.Path - GenerateCommPError error - SendSignedResponseError error - DisconnectError error - File filestore.File - OpenFileError error - DeleteFileError error - AddDealForPieceError error - AddPieceBlockLocationsError error + Address address.Address + Ask storagemarket.StorageAsk + DataTransferError error + PieceCid cid.Cid + Path filestore.Path + MetadataPath filestore.Path + GenerateCommPError error + SendSignedResponseError error + DisconnectError error } type executor func(t *testing.T, node nodeParams, params environmentParams, dealParams dealParams, + fileStoreParams tut.TestFileStoreParams, + pieceStoreParams tut.TestPieceStoreParams, dealInspector func(t *testing.T, deal storagemarket.MinerDeal)) func makeExecutor(ctx context.Context, @@ -551,6 +647,8 @@ func makeExecutor(ctx context.Context, nodeParams nodeParams, params environmentParams, dealParams dealParams, + fileStoreParams tut.TestFileStoreParams, + pieceStoreParams tut.TestPieceStoreParams, dealInspector func(t *testing.T, deal storagemarket.MinerDeal)) { smstate := testnodes.NewStorageMarketState() @@ -630,23 +728,30 @@ func makeExecutor(ctx context.Context, if dealParams.PiecePath != filestore.Path("") { dealState.PiecePath = dealParams.PiecePath } + if dealParams.MetadataPath != filestore.Path("") { + dealState.MetadataPath = dealParams.MetadataPath + } + if dealParams.ConnectionClosed { + dealState.ConnectionClosed = true + } if dealParams.DealID != abi.DealID(0) { dealState.DealID = dealParams.DealID } + fs := tut.NewTestFileStore(fileStoreParams) + pieceStore := tut.NewTestPieceStoreWithParams(pieceStoreParams) environment := &fakeEnvironment{ - address: params.Address, - node: node, - ask: params.Ask, - dataTransferError: params.DataTransferError, - pieceCid: params.PieceCid, - path: params.Path, - generateCommPError: params.GenerateCommPError, - sendSignedResponseError: params.SendSignedResponseError, - disconnectError: params.DisconnectError, - openFileError: params.OpenFileError, - deleteFileError: params.DeleteFileError, - addDealForPieceError: params.AddDealForPieceError, - addPieceBlockLocationsError: params.AddPieceBlockLocationsError, + address: params.Address, + node: node, + ask: params.Ask, + dataTransferError: params.DataTransferError, + pieceCid: params.PieceCid, + path: params.Path, + metadataPath: params.MetadataPath, + generateCommPError: params.GenerateCommPError, + sendSignedResponseError: params.SendSignedResponseError, + disconnectError: params.DisconnectError, + fs: fs, + pieceStore: pieceStore, } if environment.pieceCid == cid.Undef { environment.pieceCid = defaultPieceCid @@ -654,6 +759,9 @@ func makeExecutor(ctx context.Context, if environment.path == filestore.Path("") { environment.path = defaultPath } + if environment.metadataPath == filestore.Path("") { + environment.metadataPath = defaultMetadataPath + } if environment.address == address.Undef { environment.address = defaultProviderAddress } @@ -666,23 +774,24 @@ func makeExecutor(ctx context.Context, require.NoError(t, err) fsmCtx.ReplayEvents(t, dealState) dealInspector(t, *dealState) + fs.VerifyExpectations(t) + pieceStore.VerifyExpectations(t) } } type fakeEnvironment struct { - address address.Address - node storagemarket.StorageProviderNode - ask storagemarket.StorageAsk - dataTransferError error - pieceCid cid.Cid - path filestore.Path - generateCommPError error - sendSignedResponseError error - disconnectError error - openFileError error - deleteFileError error - addDealForPieceError error - addPieceBlockLocationsError error + address address.Address + node storagemarket.StorageProviderNode + ask storagemarket.StorageAsk + dataTransferError error + pieceCid cid.Cid + path filestore.Path + metadataPath filestore.Path + generateCommPError error + sendSignedResponseError error + disconnectError error + fs filestore.FileStore + pieceStore piecestore.PieceStore } func (fe *fakeEnvironment) Address() address.Address { @@ -701,8 +810,8 @@ func (fe *fakeEnvironment) StartDataTransfer(ctx context.Context, to peer.ID, vo return fe.dataTransferError } -func (fe *fakeEnvironment) GeneratePieceCommitmentToFile(payloadCid cid.Cid, selector ipld.Node) (cid.Cid, filestore.Path, error) { - return fe.pieceCid, fe.path, fe.generateCommPError +func (fe *fakeEnvironment) GeneratePieceCommitmentToFile(payloadCid cid.Cid, selector ipld.Node) (cid.Cid, filestore.Path, filestore.Path, error) { + return fe.pieceCid, fe.path, fe.metadataPath, fe.generateCommPError } func (fe *fakeEnvironment) SendSignedResponse(ctx context.Context, response *network.Response) error { @@ -713,49 +822,10 @@ func (fe *fakeEnvironment) Disconnect(proposalCid cid.Cid) error { return fe.disconnectError } -func (fe *fakeEnvironment) OpenFile(path filestore.Path) (filestore.File, error) { - return &fakeFile{}, fe.openFileError -} - -func (fe *fakeEnvironment) DeleteFile(path filestore.Path) error { - return fe.deleteFileError -} - -func (fe *fakeEnvironment) AddDealForPiece(pieceCID cid.Cid, dealInfo piecestore.DealInfo) error { - return fe.addDealForPieceError -} - -func (fe *fakeEnvironment) AddPieceBlockLocations(pieceCID cid.Cid, blockLocations map[cid.Cid]piecestore.BlockLocation) error { - return fe.addPieceBlockLocationsError -} - -type fakeFile struct { -} - -func (f *fakeFile) Path() filestore.Path { - panic("not implemented") -} - -func (f *fakeFile) OsPath() filestore.OsPath { - panic("not implemented") -} - -func (f *fakeFile) Size() int64 { - return 400 -} - -func (f *fakeFile) Close() error { - panic("not implemented") -} - -func (f *fakeFile) Read(p []byte) (n int, err error) { - panic("not implemented") -} - -func (f *fakeFile) Write(p []byte) (n int, err error) { - panic("not implemented") +func (fe *fakeEnvironment) FileStore() filestore.FileStore { + return fe.fs } -func (f *fakeFile) Seek(offset int64, whence int) (int64, error) { - panic("not implemented") +func (fe *fakeEnvironment) PieceStore() piecestore.PieceStore { + return fe.pieceStore } diff --git a/storagemarket/impl/providerutils/providerutils.go b/storagemarket/impl/providerutils/providerutils.go index 88814ea9..99ac59fd 100644 --- a/storagemarket/impl/providerutils/providerutils.go +++ b/storagemarket/impl/providerutils/providerutils.go @@ -8,13 +8,20 @@ import ( cborutil "github.com/filecoin-project/go-cbor-util" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-statemachine/fsm" + "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-car" + "github.com/ipld/go-ipld-prime" "golang.org/x/xerrors" + "github.com/filecoin-project/go-fil-markets/filestore" + "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" ) @@ -100,3 +107,45 @@ func DataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { } } } + +// CommPGenerator is a commP generating function that writes to a file +type CommPGenerator func(abi.RegisteredProof, cid.Cid, ipld.Node, ...car.OnNewCarBlockFunc) (cid.Cid, filestore.Path, abi.UnpaddedPieceSize, error) + +// GeneratePieceCommitmentWithMetadata generates a piece commitment along with block metadata +func GeneratePieceCommitmentWithMetadata( + fileStore filestore.FileStore, + commPGenerator CommPGenerator, + proofType abi.RegisteredProof, + payloadCid cid.Cid, + selector ipld.Node) (cid.Cid, filestore.Path, filestore.Path, error) { + metadataFile, err := fileStore.CreateTemp() + if err != nil { + return cid.Cid{}, "", "", err + } + blockRecorder := blockrecorder.RecordEachBlockTo(metadataFile) + pieceCid, path, _, err := commPGenerator(proofType, payloadCid, selector, blockRecorder) + _ = metadataFile.Close() + if err != nil { + _ = fileStore.Delete(metadataFile.Path()) + return cid.Cid{}, "", "", err + } + return pieceCid, path, metadataFile.Path(), err +} + +// LoadBlockLocations loads a metadata file then converts it to a map of cid -> blockLocation +func LoadBlockLocations(fs filestore.FileStore, metadataPath filestore.Path) (map[cid.Cid]piecestore.BlockLocation, error) { + metadataFile, err := fs.Open(metadataPath) + if err != nil { + return nil, err + } + metadata, err := blockrecorder.ReadBlockMetadata(metadataFile) + _ = metadataFile.Close() + if err != nil { + return nil, err + } + blockLocations := make(map[cid.Cid]piecestore.BlockLocation, len(metadata)) + for _, metadatum := range metadata { + blockLocations[metadatum.CID] = piecestore.BlockLocation{RelOffset: metadatum.Offset, BlockSize: metadatum.Size} + } + return blockLocations, nil +} diff --git a/storagemarket/impl/providerutils/providerutils_test.go b/storagemarket/impl/providerutils/providerutils_test.go index e6c893e4..ca3b9588 100644 --- a/storagemarket/impl/providerutils/providerutils_test.go +++ b/storagemarket/impl/providerutils/providerutils_test.go @@ -1,22 +1,31 @@ package providerutils_test import ( + "bytes" "context" "errors" + "math/rand" "testing" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-statemachine/fsm" + "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" "github.com/ipfs/go-cid" + "github.com/ipld/go-car" + "github.com/ipld/go-ipld-prime" + ipldfree "github.com/ipld/go-ipld-prime/impl/free" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-fil-markets/shared" + "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/network" @@ -181,3 +190,141 @@ func (fdg *fakeDealGroup) Send(id interface{}, name fsm.EventName, args ...inter fdg.called = true return fdg.returnedErr } + +func TestCommPGenerationWithMetadata(t *testing.T) { + tempFilePath := filestore.Path("applesauce.jpg") + tempFile := shared_testutil.NewTestFile(shared_testutil.TestFileParams{Path: tempFilePath}) + payloadCid := shared_testutil.GenerateCids(1)[0] + ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) + selector := ssb.ExploreAll(ssb.Matcher()).Node() + proofType := abi.RegisteredProof_StackedDRG2KiBPoSt + pieceCid := shared_testutil.GenerateCids(1)[0] + piecePath := filestore.Path("apiece.jpg") + pieceSize := abi.UnpaddedPieceSize(rand.Uint64()) + testCases := map[string]struct { + fileStoreParams shared_testutil.TestFileStoreParams + commPErr error + expectedPieceCid cid.Cid + expectedPiecePath filestore.Path + expectedMetadataPath filestore.Path + shouldErr bool + }{ + "success": { + fileStoreParams: shared_testutil.TestFileStoreParams{ + AvailableTempFiles: []filestore.File{tempFile}, + }, + expectedPieceCid: pieceCid, + expectedPiecePath: piecePath, + expectedMetadataPath: tempFilePath, + shouldErr: false, + }, + "tempfile creations fails": { + fileStoreParams: shared_testutil.TestFileStoreParams{}, + shouldErr: true, + }, + "commP generation fails": { + fileStoreParams: shared_testutil.TestFileStoreParams{ + AvailableTempFiles: []filestore.File{tempFile}, + ExpectedDeletions: []filestore.Path{tempFile.Path()}, + }, + commPErr: errors.New("Could not generate commP"), + shouldErr: true, + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + fcp := &fakeCommPGenerator{pieceCid, piecePath, pieceSize, testCase.commPErr} + fs := shared_testutil.NewTestFileStore(testCase.fileStoreParams) + resultPieceCid, resultPiecePath, resultMetadataPath, resultErr := providerutils.GeneratePieceCommitmentWithMetadata( + fs, fcp.GenerateCommPToFile, proofType, payloadCid, selector) + require.Equal(t, resultPieceCid, testCase.expectedPieceCid) + require.Equal(t, resultPiecePath, testCase.expectedPiecePath) + require.Equal(t, resultMetadataPath, testCase.expectedMetadataPath) + if testCase.shouldErr { + require.Error(t, resultErr) + } else { + require.NoError(t, resultErr) + } + fs.VerifyExpectations(t) + }) + } +} + +type fakeCommPGenerator struct { + pieceCid cid.Cid + path filestore.Path + size abi.UnpaddedPieceSize + err error +} + +func (fcp *fakeCommPGenerator) GenerateCommPToFile(abi.RegisteredProof, cid.Cid, ipld.Node, ...car.OnNewCarBlockFunc) (cid.Cid, filestore.Path, abi.UnpaddedPieceSize, error) { + return fcp.pieceCid, fcp.path, fcp.size, fcp.err +} + +func TestLoadBlockLocations(t *testing.T) { + testData := shared_testutil.NewTestIPLDTree() + + carBuf := new(bytes.Buffer) + blockLocationBuf := new(bytes.Buffer) + err := testData.DumpToCar(carBuf, blockrecorder.RecordEachBlockTo(blockLocationBuf)) + require.NoError(t, err) + validPath := filestore.Path("valid.data") + validFile := shared_testutil.NewTestFile(shared_testutil.TestFileParams{ + Buffer: blockLocationBuf, + Path: validPath, + }) + missingPath := filestore.Path("missing.data") + invalidPath := filestore.Path("invalid.data") + invalidData := make([]byte, 512) + _, _ = rand.Read(invalidData) + invalidFile := shared_testutil.NewTestFile(shared_testutil.TestFileParams{ + Buffer: bytes.NewBuffer(invalidData), + Path: invalidPath, + }) + fs := shared_testutil.NewTestFileStore(shared_testutil.TestFileStoreParams{ + Files: []filestore.File{validFile, invalidFile}, + ExpectedOpens: []filestore.Path{validPath, invalidPath}, + }) + testCases := map[string]struct { + path filestore.Path + shouldErr bool + expectedCids []cid.Cid + }{ + "valid data": { + path: validPath, + shouldErr: false, + expectedCids: []cid.Cid{ + testData.LeafAlphaBlock.Cid(), + testData.LeafBetaBlock.Cid(), + testData.MiddleListBlock.Cid(), + testData.MiddleMapBlock.Cid(), + testData.RootBlock.Cid(), + }, + }, + "missing data": { + path: missingPath, + shouldErr: true, + }, + "invalid data": { + path: invalidPath, + shouldErr: true, + }, + } + for testCase, data := range testCases { + t.Run(testCase, func(t *testing.T) { + results, err := providerutils.LoadBlockLocations(fs, data.path) + if data.shouldErr { + require.Error(t, err) + require.Nil(t, results) + } else { + require.NoError(t, err) + for _, c := range data.expectedCids { + _, ok := results[c] + require.True(t, ok) + } + } + }) + } + fs.VerifyExpectations(t) +} diff --git a/storagemarket/network/types_cbor_gen.go b/storagemarket/network/types_cbor_gen.go index 419e4e42..6859bbdf 100644 --- a/storagemarket/network/types_cbor_gen.go +++ b/storagemarket/network/types_cbor_gen.go @@ -6,12 +6,11 @@ import ( "fmt" "io" + "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" - - "github.com/filecoin-project/go-fil-markets/storagemarket" ) var _ = xerrors.Errorf diff --git a/storagemarket/types.go b/storagemarket/types.go index 61f70ae2..81e7e2ab 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -99,12 +99,14 @@ var StorageAskUndefined = StorageAsk{} type MinerDeal struct { market.ClientDealProposal - ProposalCid cid.Cid - Miner peer.ID - Client peer.ID - State StorageDealStatus - PiecePath filestore.Path - Message string + ProposalCid cid.Cid + Miner peer.ID + Client peer.ID + State StorageDealStatus + PiecePath filestore.Path + MetadataPath filestore.Path + ConnectionClosed bool + Message string Ref *DataRef @@ -176,6 +178,9 @@ const ( // ProviderEventPieceStoreErrored happens when an attempt to save data in the piece store errors ProviderEventPieceStoreErrored + // ProviderEventReadMetadataErrored happens when an error occurs reading recorded piece metadata + ProviderEventReadMetadataErrored + // ProviderEventDealCompleted happens when a deal completes successfully ProviderEventDealCompleted diff --git a/storagemarket/types_cbor_gen.go b/storagemarket/types_cbor_gen.go index 727e61d6..f7b5307c 100644 --- a/storagemarket/types_cbor_gen.go +++ b/storagemarket/types_cbor_gen.go @@ -6,13 +6,12 @@ import ( "fmt" "io" + "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/crypto" "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" - - "github.com/filecoin-project/go-fil-markets/filestore" ) var _ = xerrors.Errorf @@ -234,7 +233,7 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{137}); err != nil { + if _, err := w.Write([]byte{138}); err != nil { return err } @@ -290,6 +289,18 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error { return err } + // t.MetadataPath (filestore.Path) (string) + if len(t.MetadataPath) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.MetadataPath was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.MetadataPath)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.MetadataPath)); err != nil { + return err + } + // t.Message (string) (string) if len(t.Message) > cbg.MaxLength { return xerrors.Errorf("Value in field t.Message was too long") @@ -325,7 +336,7 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 9 { + if extra != 10 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -390,6 +401,16 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error { t.PiecePath = filestore.Path(sval) } + // t.MetadataPath (filestore.Path) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.MetadataPath = filestore.Path(sval) + } // t.Message (string) (string) {