From 26954e00718da9112634e76527d07c81840983ab Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 11 Aug 2023 16:15:13 +1000 Subject: [PATCH] feat: meta=eof for IPIP-431; ask for and expect (but not require) from http fetches Ref: https://github.com/ipfs/specs/pull/431 Ref: https://github.com/ipld/frisbii/pull/15 --- cmd/lassie/fetch.go | 3 + go.mod | 2 +- go.sum | 8 +- pkg/httputil/constants.go | 19 +++ pkg/httputil/metadata/metadata.go | 70 +++++++++++ pkg/httputil/metadata/metadata.ipldsch | 31 +++++ pkg/httputil/metadata/metadata_test.go | 118 ++++++++++++++++++ .../http/util.go => httputil/server.go} | 27 ++-- pkg/internal/testutil/mockroundtripper.go | 19 ++- pkg/retriever/httpretriever.go | 56 +++++++-- pkg/server/http/ipfs.go | 32 ++--- pkg/types/types.go | 18 ++- pkg/verifiedcar/verifiedcar.go | 17 +-- 13 files changed, 357 insertions(+), 63 deletions(-) create mode 100644 pkg/httputil/constants.go create mode 100644 pkg/httputil/metadata/metadata.go create mode 100644 pkg/httputil/metadata/metadata.ipldsch create mode 100644 pkg/httputil/metadata/metadata_test.go rename pkg/{server/http/util.go => httputil/server.go} (84%) diff --git a/cmd/lassie/fetch.go b/cmd/lassie/fetch.go index 38f8bdfe..ffd207f4 100644 --- a/cmd/lassie/fetch.go +++ b/cmd/lassie/fetch.go @@ -291,6 +291,9 @@ func defaultFetchRun( blockCount, humanize.IBytes(stats.Size), ) + if stats.CarProperties != nil { + fmt.Fprintf(msgWriter, "\tChecksum: %x\n", stats.CarProperties.ChecksumMultihash) + } return nil } diff --git a/go.mod b/go.mod index cb9a6762..3a3e4d84 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/ipfs/go-unixfsnode v1.7.1 github.com/ipld/go-car/v2 v2.10.1 github.com/ipld/go-codec-dagpb v1.6.0 - github.com/ipld/go-ipld-prime v0.20.1-0.20230329011551-5056175565b0 + github.com/ipld/go-ipld-prime v0.21.1-0.20230811030745-6e31cea491de github.com/ipni/go-libipni v0.0.8-0.20230425184153-86a1fcb7f7ff github.com/libp2p/go-libp2p v0.27.8 github.com/libp2p/go-libp2p-routing-helpers v0.7.0 diff --git a/go.sum b/go.sum index 4c1ad4f9..c0ea26d5 100644 --- a/go.sum +++ b/go.sum @@ -126,7 +126,7 @@ github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= -github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= @@ -336,8 +336,8 @@ github.com/ipld/go-car/v2 v2.10.1 h1:MRDqkONNW9WRhB79u+Z3U5b+NoN7lYA5B8n8qI3+BoI github.com/ipld/go-car/v2 v2.10.1/go.mod h1:sQEkXVM3csejlb1kCCb+vQ/pWBKX9QtvsrysMQjOgOg= github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s= -github.com/ipld/go-ipld-prime v0.20.1-0.20230329011551-5056175565b0 h1:iJTl9tx5DEsnKpppX5PmfdoQ3ITuBmkh3yyEpHWY2SI= -github.com/ipld/go-ipld-prime v0.20.1-0.20230329011551-5056175565b0/go.mod h1:wmOtdy70ajP48iZITH8uLsGJVMqA4EJM61/bSfYYGhs= +github.com/ipld/go-ipld-prime v0.21.1-0.20230811030745-6e31cea491de h1:N6Wfk6dvcBjF4AJJDSmti6CkgHWZPDZ0fuqSQL+kKnU= +github.com/ipld/go-ipld-prime v0.21.1-0.20230811030745-6e31cea491de/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo= github.com/ipni/go-libipni v0.0.8-0.20230425184153-86a1fcb7f7ff h1:xbKrIvnpQkbF8iHPk/HGcegsypCDpcXWHhzBCLyCWf8= github.com/ipni/go-libipni v0.0.8-0.20230425184153-86a1fcb7f7ff/go.mod h1:paYP9U4N3/vOzGCuN9kU972vtvw9JUcQjOKyiCFGwRk= @@ -616,7 +616,7 @@ github.com/urfave/cli/v2 v2.24.4 h1:0gyJJEBYtCV87zI/x2nZCPyDxD51K6xM8SkwjHFCNEU= github.com/urfave/cli/v2 v2.24.4/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= -github.com/warpfork/go-testmark v0.11.0 h1:J6LnV8KpceDvo7spaNU4+DauH2n1x+6RaO2rJrmpQ9U= +github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ= diff --git a/pkg/httputil/constants.go b/pkg/httputil/constants.go new file mode 100644 index 00000000..100c483c --- /dev/null +++ b/pkg/httputil/constants.go @@ -0,0 +1,19 @@ +package httputil + +import "fmt" + +const ( + MimeTypeCar = "application/vnd.ipld.car" // The only accepted MIME type + MimeTypeCarVersion = "1" // We only accept version 1 of the MIME type + ResponseAcceptRangesHeader = "none" // We currently don't accept range requests + ResponseCacheControlHeader = "public, max-age=29030400, immutable" // Magic cache control values + FilenameExtCar = ".car" // The only valid filename extension + FormatParameterCar = "car" // The only valid format parameter value + DefaultIncludeDupes = true // The default value for an unspecified "dups" parameter. See https://github.com/ipfs/specs/pull/412. +) + +var ( + ResponseChunkDelimeter = []byte("0\r\n") // An http/1.1 chunk delimeter, used for specifying an early end to the response + ResponseContentTypeHeader = fmt.Sprintf("%s; version=%s; order=dfs; dups=y", MimeTypeCar, MimeTypeCarVersion) + RequestAcceptHeader = fmt.Sprintf("%s; version=%s; order=dfs; dups=y; meta=eof", MimeTypeCar, MimeTypeCarVersion) +) diff --git a/pkg/httputil/metadata/metadata.go b/pkg/httputil/metadata/metadata.go new file mode 100644 index 00000000..e85f9dfc --- /dev/null +++ b/pkg/httputil/metadata/metadata.go @@ -0,0 +1,70 @@ +package metadata + +import ( + "fmt" + "io" + + "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime/codec/dagjson" + bindnoderegistry "github.com/ipld/go-ipld-prime/node/bindnode/registry" + mh "github.com/multiformats/go-multihash" + + _ "embed" +) + +//go:embed metadata.ipldsch +var schema []byte + +var BindnodeRegistry = bindnoderegistry.NewRegistry() + +type CarMetadata struct { + Metadata *Metadata +} + +func (cm CarMetadata) Serialize(w io.Writer) error { + // TODO: do the same checks we do on Deserialize() + return BindnodeRegistry.TypeToWriter(&cm, w, dagjson.Encode) +} + +func (cm *CarMetadata) Deserialize(r io.Reader) error { + cmIface, err := BindnodeRegistry.TypeFromReader(r, &CarMetadata{}, dagjson.Decode) + if err != nil { + return fmt.Errorf("invalid CarMetadata: %w", err) + } + cmm := cmIface.(*CarMetadata) // safe to assume type + if cmm.Metadata.Properties == nil && cmm.Metadata.Error == nil { + return fmt.Errorf("invalid CarMetadata: must contain either properties or error fields") + } + if (cmm.Metadata.Properties == nil) == (cmm.Metadata.Error == nil) { + return fmt.Errorf("invalid CarMetadata: must contain either properties or error fields, not both") + } + if cmm.Metadata.Properties != nil { + if _, err := mh.Decode(cmm.Metadata.Properties.ChecksumMultihash); err != nil { + return fmt.Errorf("invalid CarMetadata: checksum multihash: %w", err) + } + } + // TODO: parse and check EntityBytes format + *cm = *cmm + return nil +} + +type Metadata struct { + Request Request + Properties *types.CarProperties + Error *string +} + +type Request struct { + Root cid.Cid + Path *string + Scope types.DagScope + Duplicates bool + EntityBytes *string +} + +func init() { + if err := BindnodeRegistry.RegisterType((*CarMetadata)(nil), string(schema), "CarMetadata"); err != nil { + panic(err.Error()) + } +} diff --git a/pkg/httputil/metadata/metadata.ipldsch b/pkg/httputil/metadata/metadata.ipldsch new file mode 100644 index 00000000..ae27840f --- /dev/null +++ b/pkg/httputil/metadata/metadata.ipldsch @@ -0,0 +1,31 @@ +type CarMetadata union { + | Metadata "car-metadata/v1" +} representation keyed + +type Metadata struct { + request Request + # must contain either a properties or an error + properties optional CarProperties + error optional String +} + +type Request struct { + root &Any + path optional String + scope DagScope + duplicates Bool (rename "dups") + entityBytes optional String (rename "entity-bytes") # Must be a valid entity-bytes param: "from:to" +} + +type DagScope enum { + | all + | entity + | block +} + +type CarProperties struct { + carBytes Int (rename "car_bytes") + dataBytes Int (rename "data_bytes") + blockCount Int (rename "block_count") + checksumMultihash optional Bytes (rename "checksum") # Must be a valid multihash +} diff --git a/pkg/httputil/metadata/metadata_test.go b/pkg/httputil/metadata/metadata_test.go new file mode 100644 index 00000000..c6b776a3 --- /dev/null +++ b/pkg/httputil/metadata/metadata_test.go @@ -0,0 +1,118 @@ +package metadata_test + +import ( + "bytes" + "testing" + + "github.com/filecoin-project/lassie/pkg/httputil/metadata" + "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" +) + +var testCid = cid.MustParse("bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4") + +func TestCarMetadataRoundtrip(t *testing.T) { + path := "/birb.mp4" + orig := metadata.CarMetadata{ + Metadata: &metadata.Metadata{ + Request: metadata.Request{ + Root: testCid, + Path: &path, + Scope: types.DagScopeAll, + Duplicates: true, + }, + Properties: &types.CarProperties{ + CarBytes: 202020, + DataBytes: 101010, + BlockCount: 303, + ChecksumMultihash: testCid.Hash(), + }, + }, + } + var buf bytes.Buffer + require.NoError(t, orig.Serialize(&buf)) + + t.Log("metadata dag-json:", buf.String()) + + var roundtrip metadata.CarMetadata + require.NoError(t, roundtrip.Deserialize(&buf)) + require.Equal(t, orig, roundtrip) + require.NotNil(t, roundtrip.Metadata) + require.Equal(t, testCid, roundtrip.Metadata.Request.Root) + require.NotNil(t, roundtrip.Metadata.Request.Path) + require.Equal(t, "/birb.mp4", *roundtrip.Metadata.Request.Path) + require.Equal(t, types.DagScopeAll, roundtrip.Metadata.Request.Scope) + require.True(t, roundtrip.Metadata.Request.Duplicates) + require.NotNil(t, roundtrip.Metadata.Properties) + require.Nil(t, roundtrip.Metadata.Error) + require.Equal(t, int64(202020), roundtrip.Metadata.Properties.CarBytes) + require.Equal(t, int64(101010), roundtrip.Metadata.Properties.DataBytes) + require.Equal(t, int64(303), roundtrip.Metadata.Properties.BlockCount) + require.Equal(t, []byte(testCid.Hash()), roundtrip.Metadata.Properties.ChecksumMultihash) +} + +func TestCarMetadataErrorRoundtrip(t *testing.T) { + path := "/birb.mp4" + msg := "something bad happened" + orig := metadata.CarMetadata{ + Metadata: &metadata.Metadata{ + Request: metadata.Request{ + Root: testCid, + Path: &path, + Scope: types.DagScopeAll, + Duplicates: true, + }, + Error: &msg, + }, + } + var buf bytes.Buffer + require.NoError(t, orig.Serialize(&buf)) + + t.Log("metadata dag-json:", buf.String()) + + var roundtrip metadata.CarMetadata + require.NoError(t, roundtrip.Deserialize(&buf)) + require.Equal(t, orig, roundtrip) + require.NotNil(t, roundtrip.Metadata) + require.Equal(t, testCid, roundtrip.Metadata.Request.Root) + require.NotNil(t, roundtrip.Metadata.Request.Path) + require.Equal(t, "/birb.mp4", *roundtrip.Metadata.Request.Path) + require.Equal(t, types.DagScopeAll, roundtrip.Metadata.Request.Scope) + require.True(t, roundtrip.Metadata.Request.Duplicates) + require.Nil(t, roundtrip.Metadata.Properties) + require.NotNil(t, roundtrip.Metadata.Error) + require.Equal(t, "something bad happened", *roundtrip.Metadata.Error) +} + +func TestBadMetadata(t *testing.T) { + testCases := []struct { + name string + byts string + err string + }{ + {"empty", `{}`, `union structure constraints for CarMetadata caused rejection: a union must have exactly one entry`}, + {"bad key", `{"not metadata":true}`, `union structure constraints for CarMetadata caused rejection: no member named "not metadata"`}, + { + "bad multihash", + `{"car-metadata/v1":{"properties":{"block_count":303,"car_bytes":202020,"checksum":{"/":{"bytes":"bm90IGEgbXVsdGloYXNo"}},"data_bytes":101010},"request":{"dups":true,"path":"/birb.mp4","root":{"/":"bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4"},"scope":"all"}}}`, + `invalid CarMetadata: checksum multihash:`, + }, + { + "no properties or error", + `{"car-metadata/v1":{"request":{"dups":true,"path":"/birb.mp4","root":{"/":"bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4"},"scope":"all"}}}`, + `invalid CarMetadata: must contain either properties or error fields`, + }, + { + "both properties and error", + `{"car-metadata/v1":{"error":"something bad happened","properties":{"block_count":303,"car_bytes":202020,"checksum":{"/":{"bytes":"EiBd9neBCasGxUmysJN7nGza4ylHikmbsP2+nXs6BlIpvw"}},"data_bytes":101010},"request":{"dups":true,"path":"/birb.mp4","root":{"/":"bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4"},"scope":"all"}}}`, + `invalid CarMetadata: must contain either properties or error fields, not both`, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var roundtrip metadata.CarMetadata + require.ErrorContains(t, roundtrip.Deserialize(bytes.NewBuffer([]byte(tc.byts))), tc.err) + }) + } +} diff --git a/pkg/server/http/util.go b/pkg/httputil/server.go similarity index 84% rename from pkg/server/http/util.go rename to pkg/httputil/server.go index 13be0574..6e31a727 100644 --- a/pkg/server/http/util.go +++ b/pkg/httputil/server.go @@ -1,4 +1,4 @@ -package httpserver +package httputil import ( "errors" @@ -74,33 +74,34 @@ func ParseFilename(req *http.Request) (string, error) { // // Lassie only allows the "car" format query parameter // https://specs.ipfs.tech/http-gateways/path-gateway/#format-request-query-parameter -func CheckFormat(req *http.Request) (bool, error) { +func CheckFormat(req *http.Request) (bool, bool, error) { hasAccept := req.Header.Get("Accept") != "" // check if Accept header includes application/vnd.ipld.car - validAccept, includeDupes := ParseAccept(req.Header.Get("Accept")) + validAccept, includeDupes, includeMeta := ParseAccept(req.Header.Get("Accept")) if hasAccept && !validAccept { - return false, fmt.Errorf("no acceptable content type") + return false, false, fmt.Errorf("no acceptable content type") } // check if format is "car" hasFormat := req.URL.Query().Has("format") if hasFormat && req.URL.Query().Get("format") != FormatParameterCar { - return false, fmt.Errorf("requested non-supported format %s", req.URL.Query().Get("format")) + return false, false, fmt.Errorf("requested non-supported format %s", req.URL.Query().Get("format")) } // if neither are provided return // one of them has to be given with a CAR type since we only return CAR data if !validAccept && !hasFormat { - return false, fmt.Errorf("neither a valid accept header or format parameter were provided") + return false, false, fmt.Errorf("neither a valid accept header or format parameter were provided") } - return includeDupes, nil + return includeDupes, includeMeta, nil } // ParseAccept validates that the request Accept header is of the type CAR and // returns whether or not duplicate blocks are allowed in the response via -// IPIP-412: https://github.com/ipfs/specs/pull/412. -func ParseAccept(acceptHeader string) (validAccept bool, includeDupes bool) { +// IPIP-412: https://github.com/ipfs/specs/pull/412, and whether or not +// metadata is requested via IPIP-431: https://github.com/ipfs/specs/pull/431. +func ParseAccept(acceptHeader string) (validAccept bool, includeDupes bool, includeMeta bool) { acceptTypes := strings.Split(acceptHeader, ",") validAccept = false includeDupes = DefaultIncludeDupes @@ -140,6 +141,14 @@ func ParseAccept(acceptHeader string) (validAccept bool, includeDupes bool) { // we only do dfs, which also satisfies unk, future extensions are not yet supported validAccept = false } + case "meta": + switch value { + case "eof": + includeMeta = true + default: + // we only support eof, future extensions are not yet supported + validAccept = false + } default: // ignore others } diff --git a/pkg/internal/testutil/mockroundtripper.go b/pkg/internal/testutil/mockroundtripper.go index a20b0b49..73b68cab 100644 --- a/pkg/internal/testutil/mockroundtripper.go +++ b/pkg/internal/testutil/mockroundtripper.go @@ -110,7 +110,7 @@ func (mrt *MockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error legacyScope = "file" } require.Equal(mrt.t, req.URL.RawQuery, fmt.Sprintf("dag-scope=%s&car-scope=%s", expectedScope, legacyScope)) - require.Equal(mrt.t, req.Header["Accept"], []string{"application/vnd.ipld.car;version=1;order=dfs;dups=y"}) + require.Equal(mrt.t, req.Header["Accept"], []string{"application/vnd.ipld.car; version=1; order=dfs; dups=y; meta=eof"}) reqId := req.Header["X-Request-Id"] require.Len(mrt.t, reqId, 1) _, err = uuid.Parse(reqId[0]) @@ -175,16 +175,20 @@ func (mrt *MockRoundTripper) VerifyRetrievalsServed(ctx context.Context, t *test } func (mrt *MockRoundTripper) VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, afterStart time.Duration, expectedRetrievals []peer.ID) { - retrievals := make([]peer.ID, 0, len(expectedRetrievals)) + expectedRetrievalsStr := make([]string, 0, len(expectedRetrievals)) + for _, er := range expectedRetrievals { + expectedRetrievalsStr = append(expectedRetrievalsStr, er.String()) + } + retrievals := make([]string, 0, len(expectedRetrievals)) for i := 0; i < len(expectedRetrievals); i++ { select { case retrieval := <-mrt.endsCh: - retrievals = append(retrievals, retrieval) + retrievals = append(retrievals, retrieval.String()) case <-ctx.Done(): require.FailNowf(t, "failed to complete expected retrievals", "expected %d, received %d @ %s", len(expectedRetrievals), i, afterStart) } } - require.ElementsMatch(t, expectedRetrievals, retrievals) + require.ElementsMatch(t, expectedRetrievalsStr, retrievals) } // deferredBody is simply a Reader that lazily starts a CAR writer on the first @@ -196,6 +200,7 @@ type deferredBody struct { r io.ReadCloser once sync.Once + err error // cache Read error for multiple reads } func newDeferredBody(mrt *MockRoundTripper, remote MockRoundTripRemote, root cid.Cid) *deferredBody { @@ -297,6 +302,9 @@ func (d *deferredBody) makeBody() io.ReadCloser { } func (d *deferredBody) Read(p []byte) (n int, err error) { + if d.err != nil { + return 0, d.err + } d.once.Do(func() { d.r = d.makeBody() }) @@ -304,6 +312,9 @@ func (d *deferredBody) Read(p []byte) (n int, err error) { if err == io.EOF { d.mrt.endsCh <- d.remote.Peer.ID } + if err != nil { + d.err = err + } return n, err } diff --git a/pkg/retriever/httpretriever.go b/pkg/retriever/httpretriever.go index 0d85f951..b8987f07 100644 --- a/pkg/retriever/httpretriever.go +++ b/pkg/retriever/httpretriever.go @@ -12,10 +12,12 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/lassie/pkg/build" "github.com/filecoin-project/lassie/pkg/events" + "github.com/filecoin-project/lassie/pkg/httputil" + httpmetadata "github.com/filecoin-project/lassie/pkg/httputil/metadata" "github.com/filecoin-project/lassie/pkg/types" "github.com/filecoin-project/lassie/pkg/verifiedcar" "github.com/ipfs/go-cid" - "github.com/ipni/go-libipni/metadata" + ipnimetadata "github.com/ipni/go-libipni/metadata" "github.com/multiformats/go-multicodec" ) @@ -75,8 +77,8 @@ func (ph ProtocolHttp) Code() multicodec.Code { return multicodec.TransportIpfsGatewayHttp } -func (ph ProtocolHttp) GetMergedMetadata(cid cid.Cid, currentMetadata, newMetadata metadata.Protocol) metadata.Protocol { - return &metadata.IpfsGatewayHttp{} +func (ph ProtocolHttp) GetMergedMetadata(cid cid.Cid, currentMetadata, newMetadata ipnimetadata.Protocol) ipnimetadata.Protocol { + return &ipnimetadata.IpfsGatewayHttp{} } func (ph *ProtocolHttp) Connect(ctx context.Context, retrieval *retrieval, startTime time.Time, candidate types.RetrievalCandidate) (time.Duration, error) { @@ -116,27 +118,54 @@ func (ph *ProtocolHttp) Retrieve( if resp.StatusCode < 200 || resp.StatusCode >= 300 { return nil, ErrHttpRequestFailure{Code: resp.StatusCode} } + + _, _, includeMeta := httputil.ParseAccept(resp.Header.Get("Content-Type")) + var ttfb time.Duration rdr := newTimeToFirstByteReader(resp.Body, func() { ttfb = retrieval.Clock.Since(retrievalStart) shared.sendEvent(ctx, events.FirstByte(retrieval.Clock.Now(), retrieval.request.RetrievalID, candidate, ttfb, multicodec.TransportIpfsGatewayHttp)) }) cfg := verifiedcar.Config{ - Root: retrieval.request.Cid, - Selector: retrieval.request.GetSelector(), - ExpectDuplicatesIn: true, - MaxBlocks: retrieval.request.MaxBlocks, + Root: retrieval.request.Cid, + Selector: retrieval.request.GetSelector(), + ExpectDuplicatesIn: true, + MaxBlocks: retrieval.request.MaxBlocks, + ZeroLengthSectionAsEof: includeMeta, } blockCount, byteCount, err := cfg.VerifyCar(ctx, rdr, retrieval.request.LinkSystem) if err != nil { return nil, err } + var md httpmetadata.CarMetadata + if includeMeta { + if err := md.Deserialize(resp.Body); err != nil { + logger.Debugf("Did not get trailing metadata from http fetch from %s: %v", candidate.MinerPeer.ID, err) + } + if md.Metadata != nil { + if md.Metadata.Error != nil { + return nil, fmt.Errorf("storage provider reported an error: %v", *md.Metadata.Error) + } + if md.Metadata.Properties != nil { + if md.Metadata.Properties.BlockCount != int64(blockCount) { + return nil, fmt.Errorf("storage provider reported %d blocks, but we received %d", md.Metadata.Properties.BlockCount, blockCount) + } + if md.Metadata.Properties.CarBytes != rdr.count-1 { // -1 for the NUL byte at the end of the CAR + return nil, fmt.Errorf("storage provider reported CAR was %d bytes, but we received %d", md.Metadata.Properties.CarBytes, rdr.count) + } + if md.Metadata.Properties.DataBytes != int64(byteCount) { + return nil, fmt.Errorf("storage provider reported CAR block data was %d bytes, but we received %d", md.Metadata.Properties.DataBytes, byteCount) + } + // TODO: checksum? + } + } + } duration := retrieval.Clock.Since(retrievalStart) speed := uint64(float64(byteCount) / duration.Seconds()) - return &types.RetrievalStats{ + stats := types.RetrievalStats{ RootCid: candidate.RootCid, StorageProviderId: candidate.MinerPeer.ID, Size: byteCount, @@ -147,7 +176,12 @@ func (ph *ProtocolHttp) Retrieve( NumPayments: 0, AskPrice: big.Zero(), TimeToFirstByte: ttfb, - }, nil + } + if md.Metadata != nil && md.Metadata.Properties != nil { + stats.CarProperties = md.Metadata.Properties + } + + return &stats, nil } func (ph *ProtocolHttp) beginRequest(ctx context.Context, request types.RetrievalRequest, candidate types.RetrievalCandidate) (resp *http.Response, err error) { @@ -178,7 +212,7 @@ func makeRequest(ctx context.Context, request types.RetrievalRequest, candidate logger.Warnf("Couldn't construct a http request %s: %v", candidate.MinerPeer.ID, err) return nil, fmt.Errorf("%w for peer %s: %v", ErrBadPathForRequest, candidate.MinerPeer.ID, err) } - req.Header.Add("Accept", request.Scope.AcceptHeader()) + req.Header.Add("Accept", httputil.RequestAcceptHeader) req.Header.Add("X-Request-Id", request.RetrievalID.String()) req.Header.Add("User-Agent", build.UserAgent) @@ -190,6 +224,7 @@ var _ io.Reader = (*timeToFirstByteReader)(nil) type timeToFirstByteReader struct { r io.Reader first bool + count int64 cb func() } @@ -205,5 +240,6 @@ func (t *timeToFirstByteReader) Read(p []byte) (int, error) { t.first = true defer t.cb() } + t.count += int64(len(p)) return t.r.Read(p) } diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index 3ea6d477..d80dba65 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -8,6 +8,7 @@ import ( "strconv" "github.com/filecoin-project/lassie/pkg/build" + "github.com/filecoin-project/lassie/pkg/httputil" "github.com/filecoin-project/lassie/pkg/retriever" "github.com/filecoin-project/lassie/pkg/storage" "github.com/filecoin-project/lassie/pkg/types" @@ -18,21 +19,6 @@ import ( "github.com/multiformats/go-multicodec" ) -const ( - MimeTypeCar = "application/vnd.ipld.car" // The only accepted MIME type - MimeTypeCarVersion = "1" // We only accept version 1 of the MIME type - FormatParameterCar = "car" // The only valid format parameter value - FilenameExtCar = ".car" // The only valid filename extension - DefaultIncludeDupes = true // The default value for an unspecified "dups" parameter. See https://github.com/ipfs/specs/pull/412. - ResponseAcceptRangesHeader = "none" // We currently don't accept range requests - ResponseCacheControlHeader = "public, max-age=29030400, immutable" // Magic cache control values -) - -var ( - ResponseChunkDelimeter = []byte("0\r\n") // An http/1.1 chunk delimeter, used for specifying an early end to the response - ResponseContentTypeHeader = fmt.Sprintf("%s; version=%s", MimeTypeCar, MimeTypeCarVersion) -) - func ipfsHandler(fetcher types.Fetcher, cfg HttpServerConfig) func(http.ResponseWriter, *http.Request) { return func(res http.ResponseWriter, req *http.Request) { statusLogger := newStatusLogger(req.Method, req.URL.Path) @@ -56,13 +42,13 @@ func ipfsHandler(fetcher types.Fetcher, cfg HttpServerConfig) func(http.Response return } - includeDupes, err := CheckFormat(req) + includeDupes, _, err := httputil.CheckFormat(req) if err != nil { errorResponse(res, statusLogger, http.StatusBadRequest, err) return } - fileName, err := ParseFilename(req) + fileName, err := httputil.ParseFilename(req) if err != nil { errorResponse(res, statusLogger, http.StatusBadRequest, err) return @@ -77,7 +63,7 @@ func ipfsHandler(fetcher types.Fetcher, cfg HttpServerConfig) func(http.Response return } - dagScope, err := ParseScope(req) + dagScope, err := httputil.ParseScope(req) if err != nil { errorResponse(res, statusLogger, http.StatusBadRequest, err) return @@ -97,7 +83,7 @@ func ipfsHandler(fetcher types.Fetcher, cfg HttpServerConfig) func(http.Response // for setting Content-Disposition header based on filename url parameter if fileName == "" { - fileName = fmt.Sprintf("%s%s", rootCid.String(), FilenameExtCar) + fileName = fmt.Sprintf("%s%s", rootCid.String(), httputil.FilenameExtCar) } retrievalId, err := types.NewRetrievalID() @@ -150,9 +136,9 @@ func ipfsHandler(fetcher types.Fetcher, cfg HttpServerConfig) func(http.Response // called once we start writing blocks into the CAR (on the first Put()) res.Header().Set("Server", build.UserAgent) // "lassie/vx.y.z-" res.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", fileName)) - res.Header().Set("Accept-Ranges", ResponseAcceptRangesHeader) - res.Header().Set("Cache-Control", ResponseCacheControlHeader) - res.Header().Set("Content-Type", ResponseContentTypeHeader) + res.Header().Set("Accept-Ranges", httputil.ResponseAcceptRangesHeader) + res.Header().Set("Cache-Control", httputil.ResponseCacheControlHeader) + res.Header().Set("Content-Type", httputil.ResponseContentTypeHeader) res.Header().Set("Etag", request.Etag()) res.Header().Set("X-Content-Type-Options", "nosniff") res.Header().Set("X-Ipfs-Path", "/"+datamodel.ParsePath(req.URL.Path).String()) @@ -270,7 +256,7 @@ func closeWithUnterminatedChunk(res http.ResponseWriter) error { if err != nil { return fmt.Errorf("unable to access conn through hijack interface: %w", err) } - if _, err := buf.Write(ResponseChunkDelimeter); err != nil { + if _, err := buf.Write(httputil.ResponseChunkDelimeter); err != nil { return fmt.Errorf("writing response chunk delimiter: %w", err) } if err := buf.Flush(); err != nil { diff --git a/pkg/types/types.go b/pkg/types/types.go index cf7d3c1f..245f55d2 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -139,6 +139,20 @@ type RetrievalStats struct { AskPrice abi.TokenAmount TimeToFirstByte time.Duration Selector string + CarProperties *CarProperties // only set if CAR file is fetched via http and the provider provides the metadata +} + +// CarProperties is supplied by the storage provider during HTTP fetches and +// provides metadata about the CAR file. +type CarProperties struct { + CarBytes int64 + DataBytes int64 + BlockCount int64 + ChecksumMultihash []byte +} + +func (cp CarProperties) String() string { + return fmt.Sprintf("CarProperties", cp.CarBytes, cp.DataBytes, cp.BlockCount, cp.ChecksumMultihash) } type RetrievalResult struct { @@ -283,7 +297,3 @@ func (ds DagScope) TerminalSelectorSpec() builder.SelectorSpec { } panic(fmt.Sprintf("unknown DagScope: [%s]", string(ds))) } - -func (ds DagScope) AcceptHeader() string { - return "application/vnd.ipld.car;version=1;order=dfs;dups=y" -} diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index 03a4c4e6..f4a4fe99 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -46,13 +46,14 @@ type BlockReader interface { var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser) type Config struct { - Root cid.Cid // The single root we expect to appear in the CAR and that we use to run our traversal against - AllowCARv2 bool // If true, allow CARv2 files to be received, otherwise strictly only allow CARv1 - Selector datamodel.Node // The selector to execute, starting at the provided Root, to verify the contents of the CAR - CheckRootsMismatch bool // Check if roots match expected behavior - ExpectDuplicatesIn bool // Handles whether the incoming stream has duplicates - WriteDuplicatesOut bool // Handles whether duplicates should be written a second time as blocks - MaxBlocks uint64 // set a budget for the traversal + Root cid.Cid // The single root we expect to appear in the CAR and that we use to run our traversal against + AllowCARv2 bool // If true, allow CARv2 files to be received, otherwise strictly only allow CARv1 + Selector datamodel.Node // The selector to execute, starting at the provided Root, to verify the contents of the CAR + CheckRootsMismatch bool // Check if roots match expected behavior + ExpectDuplicatesIn bool // Handles whether the incoming stream has duplicates + WriteDuplicatesOut bool // Handles whether duplicates should be written a second time as blocks + MaxBlocks uint64 // Set a budget for the traversal + ZeroLengthSectionAsEof bool // If true, treat a zero-length section as CAR EOF, use if we expect trailing bytes (i.e. for metadata) } func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) error { return nil } @@ -69,7 +70,7 @@ func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) // // * https://specs.ipfs.tech/http-gateways/path-gateway/ func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.LinkSystem) (uint64, uint64, error) { - cbr, err := car.NewBlockReader(rdr, car.WithTrustedCAR(false)) + cbr, err := car.NewBlockReader(rdr, car.WithTrustedCAR(false), car.ZeroLengthSectionAsEOF(cfg.ZeroLengthSectionAsEof)) if err != nil { // TODO: post-1.19: fmt.Errorf("%w: %w", ErrMalformedCar, err) return 0, 0, multierr.Combine(ErrMalformedCar, err)