Skip to content

Commit

Permalink
Unexport unecesserry index APIs and integrate with mulitcodec
Browse files Browse the repository at this point in the history
Use the codec dedicated to CAR index sorted when marshalling and
unmarshalling `indexSorted`. Note, the code depends on a specific commit
of `go-multicodec` `master` branch. This needs to be replaced with a tag
once a release is made on the go-multicodec side later.

Unexport index APIs that should not be exposed publicly. Remove
`Builder` now that it is not needed anywhere. Move `insertionIndex` into
`blockstore` package since that's the only place it is used.

Introduce an index constructor that takes multicodec code and
instantiates an index.

Fix ignored errors in `indexsorted.go` during marshalling/unmarshlling.

Rename index constructor functions to use consistent terminology; i.e.
`new` instead if `mk`.

Remove redundant TODOs in code.

Relates to:
- multiformats/go-multicodec#46

Address review comments

* Rename constructor of index by codec to a simpler name and update
docs.

* Use multicodec.Code as constant instead of wrappint unit64 every time.
  • Loading branch information
masih authored and mvdan committed Jul 16, 2021
1 parent 224cd8f commit 2b593c1
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 98 deletions.
86 changes: 48 additions & 38 deletions v2/index/insertionindex.go → v2/blockstore/insertionindex.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
package index
package blockstore

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"

"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multicodec"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/petar/GoLLRB/llrb"
cbor "github.com/whyrusleeping/cbor/go"
)

type InsertionIndex struct {
items llrb.LLRB
}
var (
errUnsupported = errors.New("not supported")
insertionIndexCodec = multicodec.Code(0x300003)
)

func (ii *InsertionIndex) InsertNoReplace(key cid.Cid, n uint64) {
ii.items.InsertNoReplace(mkRecordFromCid(key, n))
}
type (
insertionIndex struct {
items llrb.LLRB
}

type recordDigest struct {
digest []byte
Record
}
recordDigest struct {
digest []byte
index.Record
}
)

func (r recordDigest) Less(than llrb.Item) bool {
other, ok := than.(recordDigest)
Expand All @@ -33,7 +40,7 @@ func (r recordDigest) Less(than llrb.Item) bool {
return bytes.Compare(r.digest, other.digest) < 0
}

func mkRecord(r Record) recordDigest {
func newRecordDigest(r index.Record) recordDigest {
d, err := multihash.Decode(r.Hash())
if err != nil {
panic(err)
Expand All @@ -42,24 +49,28 @@ func mkRecord(r Record) recordDigest {
return recordDigest{d.Digest, r}
}

func mkRecordFromCid(c cid.Cid, at uint64) recordDigest {
func newRecordFromCid(c cid.Cid, at uint64) recordDigest {
d, err := multihash.Decode(c.Hash())
if err != nil {
panic(err)
}

return recordDigest{d.Digest, Record{Cid: c, Idx: at}}
return recordDigest{d.Digest, index.Record{Cid: c, Idx: at}}
}

func (ii *InsertionIndex) Get(c cid.Cid) (uint64, error) {
func (ii *insertionIndex) insertNoReplace(key cid.Cid, n uint64) {
ii.items.InsertNoReplace(newRecordFromCid(key, n))
}

func (ii *insertionIndex) Get(c cid.Cid) (uint64, error) {
d, err := multihash.Decode(c.Hash())
if err != nil {
return 0, err
}
entry := recordDigest{digest: d.Digest}
e := ii.items.Get(entry)
if e == nil {
return 0, ErrNotFound
return 0, index.ErrNotFound
}
r, ok := e.(recordDigest)
if !ok {
Expand All @@ -69,13 +80,11 @@ func (ii *InsertionIndex) Get(c cid.Cid) (uint64, error) {
return r.Record.Idx, nil
}

func (ii *InsertionIndex) Marshal(w io.Writer) error {
func (ii *insertionIndex) Marshal(w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, int64(ii.items.Len())); err != nil {
return err
}

var err error

iter := func(i llrb.Item) bool {
if err = cbor.Encode(w, i.(recordDigest).Record); err != nil {
return false
Expand All @@ -86,30 +95,29 @@ func (ii *InsertionIndex) Marshal(w io.Writer) error {
return err
}

func (ii *InsertionIndex) Unmarshal(r io.Reader) error {
var len int64
if err := binary.Read(r, binary.LittleEndian, &len); err != nil {
func (ii *insertionIndex) Unmarshal(r io.Reader) error {
var length int64
if err := binary.Read(r, binary.LittleEndian, &length); err != nil {
return err
}
d := cbor.NewDecoder(r)
for i := int64(0); i < len; i++ {
var rec Record
for i := int64(0); i < length; i++ {
var rec index.Record
if err := d.Decode(&rec); err != nil {
return err
}
ii.items.InsertNoReplace(mkRecord(rec))
ii.items.InsertNoReplace(newRecordDigest(rec))
}
return nil
}

// Codec identifies this index format
func (ii *InsertionIndex) Codec() Codec {
return IndexInsertion
func (ii *insertionIndex) Codec() multicodec.Code {
return insertionIndexCodec
}

func (ii *InsertionIndex) Load(rs []Record) error {
func (ii *insertionIndex) Load(rs []index.Record) error {
for _, r := range rs {
rec := mkRecord(r)
rec := newRecordDigest(r)
if rec.digest == nil {
return fmt.Errorf("invalid entry: %v", r)
}
Expand All @@ -118,15 +126,17 @@ func (ii *InsertionIndex) Load(rs []Record) error {
return nil
}

func mkInsertion() Index {
ii := InsertionIndex{}
return &ii
func newInsertionIndex() *insertionIndex {
return &insertionIndex{}
}

// Flatten returns a 'indexsorted' formatted index for more efficient subsequent loading
func (ii *InsertionIndex) Flatten() (Index, error) {
si := BuildersByCodec[IndexSorted]()
rcrds := make([]Record, ii.items.Len())
// flatten returns a 'indexsorted' formatted index for more efficient subsequent loading
func (ii *insertionIndex) flatten() (index.Index, error) {
si, err := index.New(multicodec.CarIndexSorted)
if err != nil {
return nil, err
}
rcrds := make([]index.Record, ii.items.Len())

idx := 0
iter := func(i llrb.Item) bool {
Expand All @@ -142,7 +152,7 @@ func (ii *InsertionIndex) Flatten() (Index, error) {
return si, nil
}

func (ii *InsertionIndex) HasExactCID(c cid.Cid) bool {
func (ii *insertionIndex) hasExactCID(c cid.Cid) bool {
d, err := multihash.Decode(c.Hash())
if err != nil {
panic(err)
Expand Down
17 changes: 6 additions & 11 deletions v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type ReadWrite struct {
f *os.File
carV1Writer *internalio.OffsetWriteSeeker
ReadOnly
idx *index.InsertionIndex
idx *insertionIndex
header carv2.Header

dedupCids bool
Expand Down Expand Up @@ -119,16 +119,11 @@ func NewReadWrite(path string, roots []cid.Cid, opts ...Option) (*ReadWrite, err
}
}()

idxBuilder, ok := index.BuildersByCodec[index.IndexInsertion]
if !ok {
return nil, fmt.Errorf("unknownindex codec: %#v", index.IndexInsertion)
}

// Instantiate block store.
// Set the header fileld before applying options since padding options may modify header.
rwbs := &ReadWrite{
f: f,
idx: (idxBuilder()).(*index.InsertionIndex),
idx: newInsertionIndex(),
header: carv2.NewHeader(0),
}
for _, opt := range opts {
Expand Down Expand Up @@ -235,7 +230,7 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error {
if err != nil {
return err
}
b.idx.InsertNoReplace(c, uint64(frameOffset))
b.idx.insertNoReplace(c, uint64(frameOffset))

// Seek to the next frame by skipping the block.
// The frame length includes the CID, so subtract it.
Expand Down Expand Up @@ -270,15 +265,15 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error {

for _, bl := range blks {
c := bl.Cid()
if b.dedupCids && b.idx.HasExactCID(c) {
if b.dedupCids && b.idx.hasExactCID(c) {
continue
}

n := uint64(b.carV1Writer.Position())
if err := util.LdWrite(b.carV1Writer, c.Bytes(), bl.RawData()); err != nil {
return err
}
b.idx.InsertNoReplace(c, n)
b.idx.insertNoReplace(c, n)
}
return nil
}
Expand All @@ -302,7 +297,7 @@ func (b *ReadWrite) Finalize() error {
defer b.Close()

// TODO if index not needed don't bother flattening it.
fi, err := b.idx.Flatten()
fi, err := b.idx.flatten()
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-merkledag v0.3.2
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/multiformats/go-multicodec v0.2.1-0.20210713081508-b421db6850ae
github.com/multiformats/go-multihash v0.0.15
github.com/multiformats/go-varint v0.0.6
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9
Expand Down
2 changes: 2 additions & 0 deletions v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ github.com/multiformats/go-multiaddr-net v0.0.1/go.mod h1:nw6HSxNmCIQH27XPGBuX+d
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multicodec v0.2.1-0.20210713081508-b421db6850ae h1:wfljHPpiR0UDOjeqld9ds0Zxl3Nt/j+0wnvyBc01JgY=
github.com/multiformats/go-multicodec v0.2.1-0.20210713081508-b421db6850ae/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
Expand Down
8 changes: 2 additions & 6 deletions v2/index/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,5 @@ package index

import "errors"

var (
// ErrNotFound signals a record is not found in the index.
ErrNotFound = errors.New("not found")
// errUnsupported signals unsupported operation by an index.
errUnsupported = errors.New("not supported")
)
// ErrNotFound signals a record is not found in the index.
var ErrNotFound = errors.New("not found")
3 changes: 1 addition & 2 deletions v2/index/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ func Generate(v1 io.ReadSeeker) (Index, error) {
return nil, fmt.Errorf("error reading car header: %w", err)
}

// TODO: Generate should likely just take an io.ReadSeeker.
// TODO: ensure the input's header version is 1.

offset, err := carv1.HeaderSize(header)
if err != nil {
return nil, err
}

idx := mkSorted()
idx := newSorted()

records := make([]Record, 0)

Expand Down
48 changes: 21 additions & 27 deletions v2/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
"os"

"github.com/multiformats/go-multicodec"

"github.com/multiformats/go-varint"

internalio "github.com/ipld/go-car/v2/internal/io"
Expand All @@ -16,22 +18,14 @@ import (

// Codec table is a first var-int in CAR indexes
const (
IndexSorted Codec = 0x0400 // as per https://github.com/multiformats/multicodec/pull/220

// TODO: unexport these before the final release, probably
IndexHashed Codec = 0x300000 + iota
IndexSingleSorted
IndexGobHashed
IndexInsertion
indexHashed codec = 0x300000 + iota
indexSingleSorted
indexGobHashed
)

type (
// Codec is used as a multicodec identifier for CAR index files
// TODO: use go-multicodec before the final release
Codec int

// Builder is a constructor for an index type
Builder func() Index
// codec is used as a multicodec identifier for CAR index files
codec int

// Record is a pre-processed record of a car item and location.
Record struct {
Expand All @@ -41,22 +35,22 @@ type (

// Index provides an interface for looking up byte offset of a given CID.
Index interface {
Codec() Codec
Codec() multicodec.Code
Marshal(w io.Writer) error
Unmarshal(r io.Reader) error
Get(cid.Cid) (uint64, error)
Load([]Record) error
}
)

// BuildersByCodec holds known index formats
// TODO: turn this into a func before the final release?
var BuildersByCodec = map[Codec]Builder{
IndexHashed: mkHashed,
IndexSorted: mkSorted,
IndexSingleSorted: mkSingleSorted,
IndexGobHashed: mkGobHashed,
IndexInsertion: mkInsertion,
// New constructs a new index corresponding to the given CAR index codec.
func New(codec multicodec.Code) (Index, error) {
switch codec {
case multicodec.CarIndexSorted:
return newSorted(), nil
default:
return nil, fmt.Errorf("unknwon index codec: %v", codec)
}
}

// Save writes a generated index into the given `path`.
Expand Down Expand Up @@ -97,15 +91,15 @@ func WriteTo(idx Index, w io.Writer) error {
// Returns error if the encoding is not known.
func ReadFrom(r io.Reader) (Index, error) {
reader := bufio.NewReader(r)
codec, err := varint.ReadUvarint(reader)
code, err := varint.ReadUvarint(reader)
if err != nil {
return nil, err
}
builder, ok := BuildersByCodec[Codec(codec)]
if !ok {
return nil, fmt.Errorf("unknown codec: %d", codec)
codec := multicodec.Code(code)
idx, err := New(codec)
if err != nil {
return nil, err
}
idx := builder()
if err := idx.Unmarshal(reader); err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 2b593c1

Please sign in to comment.