Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

chunk, storage: chunk.Store multiple chunk Put #1681

Merged
merged 3 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (d *Descriptor) String() string {

type Store interface {
Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error)
Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error)
Put(ctx context.Context, mode ModePut, chs ...Chunk) (exist []bool, err error)
Has(ctx context.Context, addr Address) (yes bool, err error)
Set(ctx context.Context, mode ModeSet, addr Address) (err error)
LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
Expand All @@ -256,7 +256,7 @@ type Validator interface {
Validate(ch Chunk) bool
}

// ValidatorStore encapsulates Store by decorting the Put method
// ValidatorStore encapsulates Store by decorating the Put method
// with validators check.
type ValidatorStore struct {
Store
Expand All @@ -272,14 +272,25 @@ func NewValidatorStore(store Store, validators ...Validator) (s *ValidatorStore)
}
}

// Put overrides Store put method with validators check. If one of the validators
// return true, the chunk is considered valid and Store Put method is called.
// If all validators return false, ErrChunkInvalid is returned.
func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) {
// Put overrides Store put method with validators check. For Put to succeed,
// all provided chunks must be validated with true by one of the validators.
func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, chs ...Chunk) (exist []bool, err error) {
for _, ch := range chs {
if !s.validate(ch) {
return nil, ErrChunkInvalid
}
}
return s.Store.Put(ctx, mode, chs...)
}

// validate returns true if one of the validators
// return true. If all validators return false,
// the chunk is considered invalid.
func (s *ValidatorStore) validate(ch Chunk) bool {
for _, v := range s.validators {
if v.Validate(ch) {
return s.Store.Put(ctx, mode, ch)
return true
}
}
return false, ErrChunkInvalid
return false
}
4 changes: 2 additions & 2 deletions network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Ad
return nil, errors.New("roundRobinStore doesn't support Get")
}

func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, ch storage.Chunk) (bool, error) {
func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, chs ...storage.Chunk) ([]bool, error) {
i := atomic.AddUint32(&rrs.index, 1)
idx := int(i) % len(rrs.stores)
return rrs.stores[idx].Put(ctx, mode, ch)
return rrs.stores[idx].Put(ctx, mode, chs...)
}

func (rrs *roundRobinStore) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
Expand Down
13 changes: 9 additions & 4 deletions storage/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,17 @@ func NewMapChunkStore() *MapChunkStore {
}
}

func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) (bool, error) {
func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, chs ...Chunk) ([]bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
_, exists := m.chunks[ch.Address().Hex()]
m.chunks[ch.Address().Hex()] = ch
return exists, nil

exist := make([]bool, len(chs))
for i, ch := range chs {
addr := ch.Address().Hex()
_, exist[i] = m.chunks[addr]
m.chunks[addr] = ch
}
return exist, nil
}

func (m *MapChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Chunk, error) {
Expand Down
2 changes: 1 addition & 1 deletion storage/hasherstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) {
}()
seen, err := h.store.Put(ctx, chunk.ModePutUpload, ch)
h.tag.Inc(chunk.StateStored)
if seen {
if err != nil && seen[0] {
h.tag.Inc(chunk.StateSeen)
}
select {
Expand Down
9 changes: 4 additions & 5 deletions storage/localstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// TestDB_collectGarbageWorker tests garbage collection runs
// by uploading and syncing a number of chunks.
func TestDB_collectGarbageWorker(t *testing.T) {
testDB_collectGarbageWorker(t)
testDBCollectGarbageWorker(t)
}

// TestDB_collectGarbageWorker_multipleBatches tests garbage
Expand All @@ -44,13 +44,12 @@ func TestDB_collectGarbageWorker_multipleBatches(t *testing.T) {
defer func(s uint64) { gcBatchSize = s }(gcBatchSize)
gcBatchSize = 2

testDB_collectGarbageWorker(t)
testDBCollectGarbageWorker(t)
}

// testDB_collectGarbageWorker is a helper test function to test
// testDBCollectGarbageWorker is a helper test function to test
// garbage collection runs by uploading and syncing a number of chunks.
func testDB_collectGarbageWorker(t *testing.T) {
t.Helper()
func testDBCollectGarbageWorker(t *testing.T) {

chunkCount := 150

Expand Down
26 changes: 26 additions & 0 deletions storage/localstore/localstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ func generateTestRandomChunk() chunk.Chunk {
return chunk.NewChunk(key, data)
}

// generateTestRandomChunks generates a slice of random
// Chunks by using generateTestRandomChunk function.
func generateTestRandomChunks(count int) []chunk.Chunk {
chunks := make([]chunk.Chunk, count)
for i := 0; i < count; i++ {
chunks[i] = generateTestRandomChunk()
}
return chunks
}

// TestGenerateTestRandomChunk validates that
// generateTestRandomChunk returns random data by comparing
// two generated chunks.
Expand Down Expand Up @@ -219,6 +229,8 @@ func TestGenerateTestRandomChunk(t *testing.T) {
// chunk values are in the retrieval indexes.
func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
if err != nil {
t.Fatal(err)
Expand All @@ -238,6 +250,8 @@ func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTim
// chunk values are in the retrieval indexes when access time must be stored.
func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

item, err := db.retrievalDataIndex.Get(addressToItem(ch.Address()))
if err != nil {
t.Fatal(err)
Expand All @@ -258,6 +272,8 @@ func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, ac
// chunk values are in the pull index.
func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

item, err := db.pullIndex.Get(shed.Item{
Address: ch.Address(),
BinID: binID,
Expand All @@ -275,6 +291,8 @@ func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) fun
// chunk values are in the push index.
func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

item, err := db.pushIndex.Get(shed.Item{
Address: ch.Address(),
StoreTimestamp: storeTimestamp,
Expand All @@ -292,6 +310,8 @@ func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError er
// chunk values are in the push index.
func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64, binID uint64) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

item, err := db.gcIndex.Get(shed.Item{
Address: chunk.Address(),
BinID: binID,
Expand All @@ -308,6 +328,8 @@ func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp i
// an index contains expected number of key/value pairs.
func newItemsCountTest(i shed.Index, want int) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

var c int
err := i.Iterate(func(item shed.Item) (stop bool, err error) {
c++
Expand All @@ -326,6 +348,8 @@ func newItemsCountTest(i shed.Index, want int) func(t *testing.T) {
// value is the same as the number of items in DB.gcIndex.
func newIndexGCSizeTest(db *DB) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

var want uint64
err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
want++
Expand Down Expand Up @@ -354,6 +378,8 @@ type testIndexChunk struct {
// testItemsOrder tests the order of chunks in the index. If sortFunc is not nil,
// chunks will be sorted with it before validation.
func testItemsOrder(t *testing.T, i shed.Index, chunks []testIndexChunk, sortFunc func(i, j int) (less bool)) {
t.Helper()

newItemsCountTest(i, len(chunks))(t)

if sortFunc != nil {
Expand Down
Loading