From 1f1586bb75b65b42d49b698361613d434a8b949b Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Mon, 26 Aug 2019 10:59:37 +0200 Subject: [PATCH] chunk, shed, storage: chunk.Store GetMulti method --- chunk/chunk.go | 1 + network/stream/common_test.go | 4 + shed/index.go | 31 ++++++++ shed/index_test.go | 64 +++++++++++++++- storage/common_test.go | 13 ++++ storage/localstore/localstore_test.go | 9 +++ storage/localstore/mode_get.go | 65 +++++++++------- storage/localstore/mode_get_multi.go | 91 +++++++++++++++++++++++ storage/localstore/mode_get_multi_test.go | 82 ++++++++++++++++++++ storage/localstore/mode_get_test.go | 40 ++++++++++ storage/types.go | 4 + 11 files changed, 375 insertions(+), 29 deletions(-) create mode 100644 storage/localstore/mode_get_multi.go create mode 100644 storage/localstore/mode_get_multi_test.go diff --git a/chunk/chunk.go b/chunk/chunk.go index d8fbaccc12..0ca9439ab5 100644 --- a/chunk/chunk.go +++ b/chunk/chunk.go @@ -243,6 +243,7 @@ func (d *Descriptor) String() string { type Store interface { Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error) + GetMulti(ctx context.Context, mode ModeGet, addrs ...Address) (ch []Chunk, err error) Put(ctx context.Context, mode ModePut, chs ...Chunk) (exist []bool, err error) Has(ctx context.Context, addr Address) (yes bool, err error) Set(ctx context.Context, mode ModeSet, addr Address) (err error) diff --git a/network/stream/common_test.go b/network/stream/common_test.go index b5c15cd7c4..552579b827 100644 --- a/network/stream/common_test.go +++ b/network/stream/common_test.go @@ -227,6 +227,10 @@ func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Ad return nil, errors.New("roundRobinStore doesn't support Get") } +func (rrs *roundRobinStore) GetMulti(_ context.Context, _ chunk.ModeGet, _ ...storage.Address) ([]storage.Chunk, error) { + return nil, errors.New("roundRobinStore doesn't support GetMulti") +} + func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, chs ...storage.Chunk) ([]bool, error) { i := atomic.AddUint32(&rrs.index, 1) idx := int(i) % len(rrs.stores) diff --git a/shed/index.go b/shed/index.go index 7b01247fbc..5f32752d40 100644 --- a/shed/index.go +++ b/shed/index.go @@ -147,6 +147,37 @@ func (f Index) Get(keyFields Item) (out Item, err error) { return out.Merge(keyFields), nil } +// Fill populates fields on provided items that are part of the +// encoded value by getting them based on information passed in their +// fields. Every item must have all fields needed for encoding the +// key set. The passed slice items will be changed so that they +// contain data from the index values. No new slice is allocated. +// This function uses a single leveldb snapshot. +func (f Index) Fill(items []Item) (err error) { + snapshot, err := f.db.ldb.GetSnapshot() + if err != nil { + return err + } + defer snapshot.Release() + + for i, item := range items { + key, err := f.encodeKeyFunc(item) + if err != nil { + return err + } + value, err := snapshot.Get(key, nil) + if err != nil { + return err + } + v, err := f.decodeValueFunc(item, value) + if err != nil { + return err + } + items[i] = v.Merge(item) + } + return nil +} + // Has accepts key fields represented as Item to check // if there this Item's encoded key is stored in // the index. diff --git a/shed/index_test.go b/shed/index_test.go index de181fa41a..27c0885d98 100644 --- a/shed/index_test.go +++ b/shed/index_test.go @@ -49,7 +49,7 @@ var retrievalIndexFuncs = IndexFuncs{ }, } -// TestIndex validates put, get, has and delete functions of the Index implementation. +// TestIndex validates put, get, fill, has and delete functions of the Index implementation. func TestIndex(t *testing.T) { db, cleanupFunc := newTestDB(t) defer cleanupFunc() @@ -283,6 +283,68 @@ func TestIndex(t *testing.T) { t.Fatalf("got error %v, want %v", err, wantErr) } }) + + t.Run("fill", func(t *testing.T) { + want := []Item{ + { + Address: []byte("put-hash-1"), + Data: []byte("DATA123"), + StoreTimestamp: time.Now().UTC().UnixNano(), + }, + { + Address: []byte("put-hash-32"), + Data: []byte("DATA124"), + StoreTimestamp: time.Now().UTC().UnixNano(), + }, + { + Address: []byte("put-hash-42"), + Data: []byte("DATA125"), + StoreTimestamp: time.Now().UTC().UnixNano(), + }, + { + Address: []byte("put-hash-71"), + Data: []byte("DATA126"), + StoreTimestamp: time.Now().UTC().UnixNano(), + }, + } + + for _, item := range want { + err := index.Put(item) + if err != nil { + t.Fatal(err) + } + } + items := make([]Item, len(want)) + for i, w := range want { + items[i] = Item{ + Address: w.Address, + } + } + err = index.Fill(items) + if err != nil { + t.Fatal(err) + } + for i := range items { + checkItem(t, items[i], want[i]) + } + + t.Run("not found", func(t *testing.T) { + items := make([]Item, len(want)) + for i, w := range want { + items[i] = Item{ + Address: w.Address, + } + } + items = append(items, Item{ + Address: []byte("put-hash-missing"), + }) + want := leveldb.ErrNotFound + err := index.Fill(items) + if err != want { + t.Errorf("got error %v, want %v", err, want) + } + }) + }) } // TestIndex_Iterate validates index Iterate diff --git a/storage/common_test.go b/storage/common_test.go index 7b151ceed4..ecde8e7ad2 100644 --- a/storage/common_test.go +++ b/storage/common_test.go @@ -248,6 +248,19 @@ func (m *MapChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Ch return chunk, nil } +func (m *MapChunkStore) GetMulti(_ context.Context, _ chunk.ModeGet, refs ...Address) (chunks []Chunk, err error) { + m.mu.RLock() + defer m.mu.RUnlock() + for _, ref := range refs { + chunk := m.chunks[ref.Hex()] + if chunk == nil { + return nil, ErrChunkNotFound + } + chunks = append(chunks, chunk) + } + return chunks, nil +} + // Need to implement Has from SyncChunkStore func (m *MapChunkStore) Has(ctx context.Context, ref Address) (has bool, err error) { m.mu.RLock() diff --git a/storage/localstore/localstore_test.go b/storage/localstore/localstore_test.go index f431270a98..06fa28984c 100644 --- a/storage/localstore/localstore_test.go +++ b/storage/localstore/localstore_test.go @@ -195,6 +195,15 @@ func generateTestRandomChunks(count int) []chunk.Chunk { return chunks } +// chunkAddresses return chunk addresses of provided chunks. +func chunkAddresses(chunks []chunk.Chunk) []chunk.Address { + addrs := make([]chunk.Address, len(chunks)) + for i, ch := range chunks { + addrs[i] = ch.Address() + } + return addrs +} + // TestGenerateTestRandomChunk validates that // generateTestRandomChunk returns random data by comparing // two generated chunks. diff --git a/storage/localstore/mode_get.go b/storage/localstore/mode_get.go index 1ded4ec3b2..bccebd4dd8 100644 --- a/storage/localstore/mode_get.go +++ b/storage/localstore/mode_get.go @@ -67,34 +67,7 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er switch mode { // update the access timestamp and gc index case chunk.ModeGetRequest: - if db.updateGCSem != nil { - // wait before creating new goroutines - // if updateGCSem buffer id full - db.updateGCSem <- struct{}{} - } - db.updateGCWG.Add(1) - go func() { - defer db.updateGCWG.Done() - if db.updateGCSem != nil { - // free a spot in updateGCSem buffer - // for a new goroutine - defer func() { <-db.updateGCSem }() - } - - metricName := "localstore.updateGC" - metrics.GetOrRegisterCounter(metricName, nil).Inc(1) - defer totalTimeMetric(metricName, time.Now()) - - err := db.updateGC(out) - if err != nil { - metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) - log.Error("localstore update gc", "err", err) - } - // if gc update hook is defined, call it - if testHookUpdateGC != nil { - testHookUpdateGC() - } - }() + db.updateGCItems(out) case chunk.ModeGetPin: pinnedItem, err := db.pinIndex.Get(item) @@ -112,6 +85,42 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er return out, nil } +// updateGCItems is called when ModeGetRequest is used +// for Get or GetMulti to update access time and gc indexes +// for all returned chunks. +func (db *DB) updateGCItems(items ...shed.Item) { + if db.updateGCSem != nil { + // wait before creating new goroutines + // if updateGCSem buffer id full + db.updateGCSem <- struct{}{} + } + db.updateGCWG.Add(1) + go func() { + defer db.updateGCWG.Done() + if db.updateGCSem != nil { + // free a spot in updateGCSem buffer + // for a new goroutine + defer func() { <-db.updateGCSem }() + } + + metricName := "localstore.updateGC" + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + + for _, item := range items { + err := db.updateGC(item) + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + log.Error("localstore update gc", "err", err) + } + } + // if gc update hook is defined, call it + if testHookUpdateGC != nil { + testHookUpdateGC() + } + }() +} + // updateGC updates garbage collection index for // a single item. Provided item is expected to have // only Address and Data fields with non zero values, diff --git a/storage/localstore/mode_get_multi.go b/storage/localstore/mode_get_multi.go new file mode 100644 index 0000000000..c1f12249c3 --- /dev/null +++ b/storage/localstore/mode_get_multi.go @@ -0,0 +1,91 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "context" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/shed" + "github.com/syndtr/goleveldb/leveldb" +) + +// GetMulti returns chunks from the database. If one of the chunks is not found +// chunk.ErrChunkNotFound will be returned. All required indexes will be updated +// required by the Getter Mode. GetMulti is required to implement chunk.Store +// interface. +func (db *DB) GetMulti(ctx context.Context, mode chunk.ModeGet, addrs ...chunk.Address) (chunks []chunk.Chunk, err error) { + metricName := fmt.Sprintf("localstore.GetMulti.%s", mode) + + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + + defer func() { + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + } + }() + + out, err := db.getMulti(mode, addrs...) + if err != nil { + if err == leveldb.ErrNotFound { + return nil, chunk.ErrChunkNotFound + } + return nil, err + } + chunks = make([]chunk.Chunk, len(out)) + for i, ch := range out { + chunks[i] = chunk.NewChunk(ch.Address, ch.Data).WithPinCounter(ch.PinCounter) + } + return chunks, nil +} + +// getMulti returns Items from the retrieval index +// and updates other indexes. +func (db *DB) getMulti(mode chunk.ModeGet, addrs ...chunk.Address) (out []shed.Item, err error) { + out = make([]shed.Item, len(addrs)) + for i, addr := range addrs { + out[i].Address = addr + } + + err = db.retrievalDataIndex.Fill(out) + if err != nil { + return nil, err + } + + switch mode { + // update the access timestamp and gc index + case chunk.ModeGetRequest: + db.updateGCItems(out...) + + case chunk.ModeGetPin: + err := db.pinIndex.Fill(out) + if err != nil { + return nil, err + } + + // no updates to indexes + case chunk.ModeGetSync: + case chunk.ModeGetLookup: + default: + return out, ErrInvalidMode + } + return out, nil +} diff --git a/storage/localstore/mode_get_multi_test.go b/storage/localstore/mode_get_multi_test.go new file mode 100644 index 0000000000..6ad7e5af08 --- /dev/null +++ b/storage/localstore/mode_get_multi_test.go @@ -0,0 +1,82 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "context" + "reflect" + "testing" + + "github.com/ethersphere/swarm/chunk" +) + +// TestModeGetMulti stores chunks and validates that GetMulti +// is returning them correctly. +func TestModeGetMulti(t *testing.T) { + const chunkCount = 10 + + for _, mode := range []chunk.ModeGet{ + chunk.ModeGetRequest, + chunk.ModeGetSync, + chunk.ModeGetLookup, + chunk.ModeGetPin, + } { + t.Run(mode.String(), func(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + chunks := generateTestRandomChunks(chunkCount) + + _, err := db.Put(context.Background(), chunk.ModePutUpload, chunks...) + if err != nil { + t.Fatal(err) + } + + if mode == chunk.ModeGetPin { + // pin chunks so that it is not returned as not found by pinIndex + for i, ch := range chunks { + err := db.Set(context.Background(), chunk.ModeSetPin, ch.Address()) + if err != nil { + t.Fatal(err) + } + chunks[i] = ch.WithPinCounter(1) + } + } + + addrs := chunkAddresses(chunks) + + got, err := db.GetMulti(context.Background(), mode, addrs...) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < chunkCount; i++ { + if !reflect.DeepEqual(got[i], chunks[i]) { + t.Errorf("got %v chunk %v, want %v", i, got[i], chunks[i]) + } + } + + missingChunk := generateTestRandomChunk() + + want := chunk.ErrChunkNotFound + _, err = db.GetMulti(context.Background(), mode, append(addrs, missingChunk.Address())...) + if err != want { + t.Errorf("got error %v, want %v", err, want) + } + }) + } +} diff --git a/storage/localstore/mode_get_test.go b/storage/localstore/mode_get_test.go index cea84b0970..82c9bd09e7 100644 --- a/storage/localstore/mode_get_test.go +++ b/storage/localstore/mode_get_test.go @@ -135,6 +135,31 @@ func TestModeGetRequest(t *testing.T) { t.Run("gc size", newIndexGCSizeTest(db)) }) + + t.Run("multi", func(t *testing.T) { + got, err := db.GetMulti(context.Background(), chunk.ModeGetRequest, ch.Address()) + if err != nil { + t.Fatal(err) + } + // wait for update gc goroutine to be done + <-testHookUpdateGCChan + + if !bytes.Equal(got[0].Address(), ch.Address()) { + t.Errorf("got chunk address %x, want %x", got[0].Address(), ch.Address()) + } + + if !bytes.Equal(got[0].Data(), ch.Data()) { + t.Errorf("got chunk data %x, want %x", got[0].Data(), ch.Data()) + } + + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, uploadTimestamp)) + + t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, uploadTimestamp, 1)) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) + + t.Run("gc size", newIndexGCSizeTest(db)) + }) } // TestModeGetSync validates ModeGetSync index values on the provided DB. @@ -172,6 +197,21 @@ func TestModeGetSync(t *testing.T) { t.Run("gc index count", newItemsCountTest(db.gcIndex, 0)) t.Run("gc size", newIndexGCSizeTest(db)) + + t.Run("multi", func(t *testing.T) { + got, err := db.GetMulti(context.Background(), chunk.ModeGetSync, ch.Address()) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(got[0].Address(), ch.Address()) { + t.Errorf("got chunk address %x, want %x", got[0].Address(), ch.Address()) + } + + if !bytes.Equal(got[0].Data(), ch.Data()) { + t.Errorf("got chunk data %x, want %x", got[0].Data(), ch.Data()) + } + }) } // setTestHookUpdateGC sets testHookUpdateGC and diff --git a/storage/types.go b/storage/types.go index 40d93ec479..c13c89f902 100644 --- a/storage/types.go +++ b/storage/types.go @@ -232,6 +232,10 @@ func (f *FakeChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (C panic("FakeChunkStore doesn't support Get") } +func (f *FakeChunkStore) GetMulti(_ context.Context, _ chunk.ModeGet, refs ...Address) ([]Chunk, error) { + panic("FakeChunkStore doesn't support GetMulti") +} + func (f *FakeChunkStore) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) { panic("FakeChunkStore doesn't support Set") }