Skip to content

Commit

Permalink
Decode dagpb blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
burdiyan committed Sep 18, 2023
1 parent 9e44b07 commit c68e6a1
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 93 deletions.
2 changes: 1 addition & 1 deletion backend/hyper/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func NewChange(eid EntityID, deps []cid.Cid, ts hlc.Time, signer core.KeyPair, d
return hb, fmt.Errorf("failed to sign change: %w", err)
}

hb, err = EncodeBlob(ch.Type, ch)
hb, err = EncodeBlob(ch)
if err != nil {
return hb, err
}
Expand Down
1 change: 0 additions & 1 deletion backend/hyper/entity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func TestEntity(t *testing.T) {
})
require.NoError(t, err)

require.Equal(t, TypeChange, ch.Type)
require.Nil(t, ch.Decoded.(Change).Deps)

name, _ = e.Get("name")
Expand Down
164 changes: 114 additions & 50 deletions backend/hyper/hyper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
cbornode "github.com/ipfs/go-ipld-cbor"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"go.uber.org/zap"
)

Expand All @@ -26,16 +26,27 @@ type BlobType string
// Storage is an indexing blob storage.
type Storage struct {
db *sqlitex.Pool
bs *blockStore
bs *indexingBlockStore
log *zap.Logger

*indexer
}

// NewStorage creates a new blob storage.
func NewStorage(db *sqlitex.Pool, log *zap.Logger) *Storage {
return &Storage{
bs := newBlockstore(db)

idx := &indexer{
db: db,
bs: newBlockstore(db),
log: log,
bs: bs,
}

return &Storage{
db: db,
bs: &indexingBlockStore{blockStore: bs, indexBlob: idx.indexBlob},
log: log,
indexer: idx,
}
}

Expand Down Expand Up @@ -70,8 +81,10 @@ func (bs *Storage) SaveBlob(ctx context.Context, blob Blob) error {
}
defer release()

codec, hash := ipfs.DecodeCID(blob.CID)

return sqlitex.WithTx(conn, func() error {
id, exists, err := bs.bs.putBlock(conn, 0, uint64(blob.Codec), blob.Hash, blob.Data)
id, exists, err := bs.bs.putBlock(conn, 0, uint64(codec), hash, blob.Data)
if err != nil {
return err
}
Expand All @@ -81,7 +94,7 @@ func (bs *Storage) SaveBlob(ctx context.Context, blob Blob) error {
return nil
}

if err := bs.indexBlob(conn, id, blob); err != nil {
if err := bs.indexBlob(conn, id, blob.CID, blob.Decoded); err != nil {
return fmt.Errorf("failed to index blob %s: %w", blob.CID, err)
}

Expand Down Expand Up @@ -122,8 +135,10 @@ func (bs *Storage) SaveDraftBlob(ctx context.Context, eid EntityID, blob Blob) e
}
defer release()

codec, hash := ipfs.DecodeCID(blob.CID)

return sqlitex.WithTx(conn, func() error {
id, exists, err := bs.bs.putBlock(conn, 0, uint64(blob.Codec), blob.Hash, blob.Data)
id, exists, err := bs.bs.putBlock(conn, 0, uint64(codec), hash, blob.Data)
if err != nil {
return err
}
Expand All @@ -133,7 +148,7 @@ func (bs *Storage) SaveDraftBlob(ctx context.Context, eid EntityID, blob Blob) e
return nil
}

if err := bs.indexBlob(conn, id, blob); err != nil {
if err := bs.indexBlob(conn, id, blob.CID, blob.Decoded); err != nil {
return fmt.Errorf("failed to index blob %s: %w", blob.CID, err)
}

Expand Down Expand Up @@ -283,7 +298,9 @@ func (bs *Storage) ReplaceDraftBlob(ctx context.Context, eid EntityID, old cid.C
return err
}

id, exists, err := bs.bs.putBlock(conn, oldid, uint64(blob.Codec), blob.Hash, blob.Data)
codec, hash := ipfs.DecodeCID(blob.CID)

id, exists, err := bs.bs.putBlock(conn, oldid, uint64(codec), hash, blob.Data)
if err != nil {
return fmt.Errorf("replace draft blob error when insert: %w", err)
}
Expand All @@ -293,7 +310,7 @@ func (bs *Storage) ReplaceDraftBlob(ctx context.Context, eid EntityID, old cid.C
return nil
}

if err := bs.indexBlob(conn, id, blob); err != nil {
if err := bs.indexBlob(conn, id, blob.CID, blob.Decoded); err != nil {
return fmt.Errorf("failed to index blob %s: %w", blob.CID, err)
}

Expand Down Expand Up @@ -343,31 +360,23 @@ func (bs *Storage) IPFSBlockstore() blockstore.Blockstore {

// Blob is a structural artifact.
type Blob struct {
Type BlobType
CID cid.Cid
Codec multicodec.Code
Hash multihash.Multihash
Data []byte
Decoded any
}

// EncodeBlob produces a Blob from any object.
func EncodeBlob(t BlobType, v any) (hb Blob, err error) {
func EncodeBlob(v any) (hb Blob, err error) {
data, err := cbornode.DumpObject(v)
if err != nil {
return hb, fmt.Errorf("failed to encode blob with type %s: %w", t, err)
return hb, fmt.Errorf("failed to encode blob %T: %w", v, err)
}

codec := multicodec.DagCbor

blk := ipfs.NewBlock(uint64(codec), data)
blk := ipfs.NewBlock(uint64(multicodec.DagCbor), data)
c := blk.Cid()

return Blob{
Type: t,
CID: c,
Codec: codec,
Hash: c.Hash(),
Data: data,
Decoded: v,
}, nil
Expand All @@ -379,44 +388,99 @@ var errNotHyperBlob = errors.New("not a hyper blob")
func DecodeBlob(c cid.Cid, data []byte) (hb Blob, err error) {
codec := c.Prefix().Codec

if codec != uint64(multicodec.DagCbor) {
return hb, fmt.Errorf("%s: %w", c, errNotHyperBlob)
}

var v struct {
Type string `cbor:"@type"`
}
if err := cbor.Unmarshal(data, &v); err != nil {
var vv any
if err := cbornode.DecodeInto(data, &vv); err != nil {
panic(err)
switch multicodec.Code(codec) {
case multicodec.DagPb:
b := dagpb.Type.PBNode.NewBuilder()
if err := dagpb.DecodeBytes(b, data); err != nil {
return hb, fmt.Errorf("failed to decode dagpb node %s: %w", c, err)
}

return hb, fmt.Errorf("failed to infer hyper blob %s: %w", c, err)
}

switch BlobType(v.Type) {
case TypeKeyDelegation:
var v KeyDelegation
if err := cbornode.DecodeInto(data, &v); err != nil {
return hb, err
hb.Decoded = b.Build()
case multicodec.DagCbor:
var v struct {
Type string `cbor:"@type"`
}
hb.Decoded = v
case TypeChange:
var v Change
if err := cbornode.DecodeInto(data, &v); err != nil {
return hb, err
if err := cbor.Unmarshal(data, &v); err != nil {
return hb, fmt.Errorf("failed to infer hyper blob %s: %w", c, err)
}

switch BlobType(v.Type) {
case TypeKeyDelegation:
var v KeyDelegation
if err := cbornode.DecodeInto(data, &v); err != nil {
return hb, err
}
hb.Decoded = v
case TypeChange:
var v Change
if err := cbornode.DecodeInto(data, &v); err != nil {
return hb, err
}
hb.Decoded = v
default:
return hb, fmt.Errorf("unknown hyper blob type: '%s'", v.Type)
}
hb.Decoded = v
default:
return hb, fmt.Errorf("unknown hyper blob type: '%s'", v.Type)
return hb, fmt.Errorf("%s: %w", c, errNotHyperBlob)
}

hb.Type = BlobType(v.Type)
hb.CID = c
hb.Codec = multicodec.Code(codec)
hb.Hash = c.Hash()
hb.Data = data

return hb, nil
}

type indexingBlockStore struct {
*blockStore
indexBlob func(conn *sqlite.Conn, id int64, c cid.Cid, blob any) error
}

func (b *indexingBlockStore) Put(ctx context.Context, block blocks.Block) error {
// conn, release, err := b.db.Conn(ctx)
// if err != nil {
// return err
// }
// defer release()

// return sqlitex.WithTx(conn, func() error {
// codec, hash := ipfs.DecodeCID(block.Cid())
// id, exists, err := b.putBlock(conn, 0, codec, hash, block.RawData())
// if err != nil {
// return err
// }

// if exists {
// return nil
// }

// hb, err := DecodeBlob(block.Cid(), block.RawData())
// if err != nil {
// return err
// }

// return b.indexBlob(conn, id, hb.CID, hb.Decoded)
// })

return b.blockStore.Put(ctx, block)
}

// PutMany implements blockstore.Blockstore interface.
func (b *indexingBlockStore) PutMany(ctx context.Context, blocks []blocks.Block) error {
// conn, release, err := b.db.Conn(ctx)
// if err != nil {
// return err
// }
// defer release()

// return sqlitex.WithTx(conn, func() error {
// for _, blk := range blocks {
// codec, hash := ipfs.DecodeCID(blk.Cid())
// if _, _, err := b.putBlock(conn, 0, codec, hash, blk.RawData()); err != nil {
// return err
// }
// }
// return nil
// })

return b.blockStore.PutMany(ctx, blocks)
}
Loading

0 comments on commit c68e6a1

Please sign in to comment.