Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

chunk, shed, storage: chunk.Store GetMulti method #1691

Merged
merged 1 commit into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (d *Descriptor) String() string {

type Store interface {
Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error)
GetMulti(ctx context.Context, mode ModeGet, addrs ...Address) (ch []Chunk, err error)
Put(ctx context.Context, mode ModePut, chs ...Chunk) (exist []bool, err error)
Has(ctx context.Context, addr Address) (yes bool, err error)
Set(ctx context.Context, mode ModeSet, addr Address) (err error)
Expand Down
4 changes: 4 additions & 0 deletions network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Ad
return nil, errors.New("roundRobinStore doesn't support Get")
}

func (rrs *roundRobinStore) GetMulti(_ context.Context, _ chunk.ModeGet, _ ...storage.Address) ([]storage.Chunk, error) {
return nil, errors.New("roundRobinStore doesn't support GetMulti")
}

func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, chs ...storage.Chunk) ([]bool, error) {
i := atomic.AddUint32(&rrs.index, 1)
idx := int(i) % len(rrs.stores)
Expand Down
31 changes: 31 additions & 0 deletions shed/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,37 @@ func (f Index) Get(keyFields Item) (out Item, err error) {
return out.Merge(keyFields), nil
}

// Fill populates fields on provided items that are part of the
// encoded value by getting them based on information passed in their
// fields. Every item must have all fields needed for encoding the
// key set. The passed slice items will be changed so that they
// contain data from the index values. No new slice is allocated.
// This function uses a single leveldb snapshot.
func (f Index) Fill(items []Item) (err error) {
snapshot, err := f.db.ldb.GetSnapshot()
if err != nil {
return err
}
defer snapshot.Release()

for i, item := range items {
key, err := f.encodeKeyFunc(item)
if err != nil {
return err
}
value, err := snapshot.Get(key, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this assumes that all requested chunks are in the DB. Does this account for the case where GC was run in between and value is not found in the db? only by returning an error right? I guess this is also a feasible approach

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if the chunk is missing the error will be returned.

As the snapshot is acquired, gc will not influence this result if it runs while this function is called. The chunk will be returned in that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool 👍 let's keep it so then. I think for now it would also make error handling more clear and easy on the calling context

if err != nil {
return err
}
v, err := f.decodeValueFunc(item, value)
if err != nil {
return err
}
items[i] = v.Merge(item)
}
return nil
}

// Has accepts key fields represented as Item to check
// if there this Item's encoded key is stored in
// the index.
Expand Down
64 changes: 63 additions & 1 deletion shed/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var retrievalIndexFuncs = IndexFuncs{
},
}

// TestIndex validates put, get, has and delete functions of the Index implementation.
// TestIndex validates put, get, fill, has and delete functions of the Index implementation.
func TestIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t)
defer cleanupFunc()
Expand Down Expand Up @@ -283,6 +283,68 @@ func TestIndex(t *testing.T) {
t.Fatalf("got error %v, want %v", err, wantErr)
}
})

t.Run("fill", func(t *testing.T) {
want := []Item{
{
Address: []byte("put-hash-1"),
Data: []byte("DATA123"),
StoreTimestamp: time.Now().UTC().UnixNano(),
},
{
Address: []byte("put-hash-32"),
Data: []byte("DATA124"),
StoreTimestamp: time.Now().UTC().UnixNano(),
},
{
Address: []byte("put-hash-42"),
Data: []byte("DATA125"),
StoreTimestamp: time.Now().UTC().UnixNano(),
},
{
Address: []byte("put-hash-71"),
Data: []byte("DATA126"),
StoreTimestamp: time.Now().UTC().UnixNano(),
},
}

for _, item := range want {
err := index.Put(item)
if err != nil {
t.Fatal(err)
}
}
items := make([]Item, len(want))
for i, w := range want {
items[i] = Item{
Address: w.Address,
}
}
err = index.Fill(items)
if err != nil {
t.Fatal(err)
}
for i := range items {
checkItem(t, items[i], want[i])
}

t.Run("not found", func(t *testing.T) {
items := make([]Item, len(want))
for i, w := range want {
items[i] = Item{
Address: w.Address,
}
}
items = append(items, Item{
Address: []byte("put-hash-missing"),
})
want := leveldb.ErrNotFound
err := index.Fill(items)
if err != want {
t.Errorf("got error %v, want %v", err, want)
}
})
})
}

// TestIndex_Iterate validates index Iterate
Expand Down
13 changes: 13 additions & 0 deletions storage/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,19 @@ func (m *MapChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Ch
return chunk, nil
}

func (m *MapChunkStore) GetMulti(_ context.Context, _ chunk.ModeGet, refs ...Address) (chunks []Chunk, err error) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, ref := range refs {
chunk := m.chunks[ref.Hex()]
if chunk == nil {
return nil, ErrChunkNotFound
}
chunks = append(chunks, chunk)
}
return chunks, nil
}

// Need to implement Has from SyncChunkStore
func (m *MapChunkStore) Has(ctx context.Context, ref Address) (has bool, err error) {
m.mu.RLock()
Expand Down
9 changes: 9 additions & 0 deletions storage/localstore/localstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ func generateTestRandomChunks(count int) []chunk.Chunk {
return chunks
}

// chunkAddresses return chunk addresses of provided chunks.
func chunkAddresses(chunks []chunk.Chunk) []chunk.Address {
addrs := make([]chunk.Address, len(chunks))
for i, ch := range chunks {
addrs[i] = ch.Address()
}
return addrs
}

// TestGenerateTestRandomChunk validates that
// generateTestRandomChunk returns random data by comparing
// two generated chunks.
Expand Down
65 changes: 37 additions & 28 deletions storage/localstore/mode_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,34 +67,7 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
switch mode {
// update the access timestamp and gc index
case chunk.ModeGetRequest:
if db.updateGCSem != nil {
// wait before creating new goroutines
// if updateGCSem buffer id full
db.updateGCSem <- struct{}{}
}
db.updateGCWG.Add(1)
go func() {
defer db.updateGCWG.Done()
if db.updateGCSem != nil {
// free a spot in updateGCSem buffer
// for a new goroutine
defer func() { <-db.updateGCSem }()
}

metricName := "localstore.updateGC"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

err := db.updateGC(out)
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
log.Error("localstore update gc", "err", err)
}
// if gc update hook is defined, call it
if testHookUpdateGC != nil {
testHookUpdateGC()
}
}()
db.updateGCItems(out)

case chunk.ModeGetPin:
pinnedItem, err := db.pinIndex.Get(item)
Expand All @@ -112,6 +85,42 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
return out, nil
}

// updateGCItems is called when ModeGetRequest is used
// for Get or GetMulti to update access time and gc indexes
// for all returned chunks.
func (db *DB) updateGCItems(items ...shed.Item) {
if db.updateGCSem != nil {
// wait before creating new goroutines
// if updateGCSem buffer id full
db.updateGCSem <- struct{}{}
}
db.updateGCWG.Add(1)
go func() {
defer db.updateGCWG.Done()
if db.updateGCSem != nil {
// free a spot in updateGCSem buffer
// for a new goroutine
defer func() { <-db.updateGCSem }()
}

metricName := "localstore.updateGC"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

for _, item := range items {
err := db.updateGC(item)
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
log.Error("localstore update gc", "err", err)
}
}
// if gc update hook is defined, call it
if testHookUpdateGC != nil {
testHookUpdateGC()
}
}()
}

// updateGC updates garbage collection index for
// a single item. Provided item is expected to have
// only Address and Data fields with non zero values,
Expand Down
91 changes: 91 additions & 0 deletions storage/localstore/mode_get_multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2019 The Swarm Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package localstore

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)

// GetMulti returns chunks from the database. If one of the chunks is not found
// chunk.ErrChunkNotFound will be returned. All required indexes will be updated
// required by the Getter Mode. GetMulti is required to implement chunk.Store
// interface.
func (db *DB) GetMulti(ctx context.Context, mode chunk.ModeGet, addrs ...chunk.Address) (chunks []chunk.Chunk, err error) {
metricName := fmt.Sprintf("localstore.GetMulti.%s", mode)

metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

defer func() {
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
}()

out, err := db.getMulti(mode, addrs...)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, chunk.ErrChunkNotFound
}
return nil, err
}
chunks = make([]chunk.Chunk, len(out))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there might be cases where len(out) < len(addrs), this in the calling context must be checked and somehow handled, and so I am wondering if it is not better to have chunks := make([]chunk.Chunk, len(addrs) and within this slice pad the chunks of set addr which were not found with nil values, that is to allow the calling context to verify which chunks were not found with time complexity of O(1) rather than O(n) in this case.
I think this might be a premature optimization and maybe we could consider to have this as a future iteration and so I won't block on this, but would be happy to have @zelig's opinion on this (and everyone else that feels like commenting on this)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @acud. Actually, len(out) is equal to len(addrs) as out is constructed on line 63 in getMulti with len(addrs) as length.

I like your idea of passing nils for not found chunks. I am also interested in other opinions, as this would require changing shed.Index.Fill to handle []*Item instead []Item, but Item is used with value semantics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I missed the fact they are of equal length :)
See my remark on Fill in this context

for i, ch := range out {
chunks[i] = chunk.NewChunk(ch.Address, ch.Data).WithPinCounter(ch.PinCounter)
}
return chunks, nil
}

// getMulti returns Items from the retrieval index
// and updates other indexes.
func (db *DB) getMulti(mode chunk.ModeGet, addrs ...chunk.Address) (out []shed.Item, err error) {
out = make([]shed.Item, len(addrs))
for i, addr := range addrs {
out[i].Address = addr
}

err = db.retrievalDataIndex.Fill(out)
if err != nil {
return nil, err
}

switch mode {
// update the access timestamp and gc index
case chunk.ModeGetRequest:
db.updateGCItems(out...)

case chunk.ModeGetPin:
err := db.pinIndex.Fill(out)
if err != nil {
return nil, err
}

// no updates to indexes
case chunk.ModeGetSync:
case chunk.ModeGetLookup:
default:
return out, ErrInvalidMode
}
return out, nil
}
Loading