Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
chunk, storage: chunk.Store multiple chunk Put (#1681)
Browse files Browse the repository at this point in the history
  • Loading branch information
janos authored Aug 22, 2019
1 parent a0aefcf commit 5e982b6
Show file tree
Hide file tree
Showing 10 changed files with 578 additions and 310 deletions.
27 changes: 19 additions & 8 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (d *Descriptor) String() string {

type Store interface {
Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error)
Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error)
Put(ctx context.Context, mode ModePut, chs ...Chunk) (exist []bool, err error)
Has(ctx context.Context, addr Address) (yes bool, err error)
Set(ctx context.Context, mode ModeSet, addr Address) (err error)
LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
Expand All @@ -256,7 +256,7 @@ type Validator interface {
Validate(ch Chunk) bool
}

// ValidatorStore encapsulates Store by decorting the Put method
// ValidatorStore encapsulates Store by decorating the Put method
// with validators check.
type ValidatorStore struct {
Store
Expand All @@ -272,14 +272,25 @@ func NewValidatorStore(store Store, validators ...Validator) (s *ValidatorStore)
}
}

// Put overrides Store put method with validators check. If one of the validators
// return true, the chunk is considered valid and Store Put method is called.
// If all validators return false, ErrChunkInvalid is returned.
func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) {
// Put overrides Store put method with validators check. For Put to succeed,
// all provided chunks must be validated with true by one of the validators.
func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, chs ...Chunk) (exist []bool, err error) {
for _, ch := range chs {
if !s.validate(ch) {
return nil, ErrChunkInvalid
}
}
return s.Store.Put(ctx, mode, chs...)
}

// validate returns true if one of the validators
// return true. If all validators return false,
// the chunk is considered invalid.
func (s *ValidatorStore) validate(ch Chunk) bool {
for _, v := range s.validators {
if v.Validate(ch) {
return s.Store.Put(ctx, mode, ch)
return true
}
}
return false, ErrChunkInvalid
return false
}
4 changes: 2 additions & 2 deletions network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Ad
return nil, errors.New("roundRobinStore doesn't support Get")
}

func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, ch storage.Chunk) (bool, error) {
func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, chs ...storage.Chunk) ([]bool, error) {
i := atomic.AddUint32(&rrs.index, 1)
idx := int(i) % len(rrs.stores)
return rrs.stores[idx].Put(ctx, mode, ch)
return rrs.stores[idx].Put(ctx, mode, chs...)
}

func (rrs *roundRobinStore) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
Expand Down
13 changes: 9 additions & 4 deletions storage/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,17 @@ func NewMapChunkStore() *MapChunkStore {
}
}

func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) (bool, error) {
func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, chs ...Chunk) ([]bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
_, exists := m.chunks[ch.Address().Hex()]
m.chunks[ch.Address().Hex()] = ch
return exists, nil

exist := make([]bool, len(chs))
for i, ch := range chs {
addr := ch.Address().Hex()
_, exist[i] = m.chunks[addr]
m.chunks[addr] = ch
}
return exist, nil
}

func (m *MapChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Chunk, error) {
Expand Down
2 changes: 1 addition & 1 deletion storage/hasherstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) {
}()
seen, err := h.store.Put(ctx, chunk.ModePutUpload, ch)
h.tag.Inc(chunk.StateStored)
if seen {
if err != nil && seen[0] {
h.tag.Inc(chunk.StateSeen)
}
select {
Expand Down
9 changes: 4 additions & 5 deletions storage/localstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// TestDB_collectGarbageWorker tests garbage collection runs
// by uploading and syncing a number of chunks.
func TestDB_collectGarbageWorker(t *testing.T) {
testDB_collectGarbageWorker(t)
testDBCollectGarbageWorker(t)
}

// TestDB_collectGarbageWorker_multipleBatches tests garbage
Expand All @@ -44,13 +44,12 @@ func TestDB_collectGarbageWorker_multipleBatches(t *testing.T) {
defer func(s uint64) { gcBatchSize = s }(gcBatchSize)
gcBatchSize = 2

testDB_collectGarbageWorker(t)
testDBCollectGarbageWorker(t)
}

// testDB_collectGarbageWorker is a helper test function to test
// testDBCollectGarbageWorker is a helper test function to test
// garbage collection runs by uploading and syncing a number of chunks.
func testDB_collectGarbageWorker(t *testing.T) {
t.Helper()
func testDBCollectGarbageWorker(t *testing.T) {

chunkCount := 150

Expand Down
26 changes: 26 additions & 0 deletions storage/localstore/localstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ func generateTestRandomChunk() chunk.Chunk {
return chunk.NewChunk(key, data)
}

// generateTestRandomChunks generates a slice of random
// Chunks by using generateTestRandomChunk function.
func generateTestRandomChunks(count int) []chunk.Chunk {
chunks := make([]chunk.Chunk, count)
for i := 0; i < count; i++ {
chunks[i] = generateTestRandomChunk()
}
return chunks
}

// TestGenerateTestRandomChunk validates that
// generateTestRandomChunk returns random data by comparing
// two generated chunks.
Expand Down Expand Up @@ -219,6 +229,8 @@ func TestGenerateTestRandomChunk(t *testing.T) {
// chunk values are in the retrieval indexes.
func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
if err != nil {
t.Fatal(err)
Expand All @@ -238,6 +250,8 @@ func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTim
// chunk values are in the retrieval indexes when access time must be stored.
func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

item, err := db.retrievalDataIndex.Get(addressToItem(ch.Address()))
if err != nil {
t.Fatal(err)
Expand All @@ -258,6 +272,8 @@ func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, ac
// chunk values are in the pull index.
func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

item, err := db.pullIndex.Get(shed.Item{
Address: ch.Address(),
BinID: binID,
Expand All @@ -275,6 +291,8 @@ func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) fun
// chunk values are in the push index.
func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

item, err := db.pushIndex.Get(shed.Item{
Address: ch.Address(),
StoreTimestamp: storeTimestamp,
Expand All @@ -292,6 +310,8 @@ func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError er
// chunk values are in the push index.
func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64, binID uint64) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

item, err := db.gcIndex.Get(shed.Item{
Address: chunk.Address(),
BinID: binID,
Expand All @@ -308,6 +328,8 @@ func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp i
// an index contains expected number of key/value pairs.
func newItemsCountTest(i shed.Index, want int) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

var c int
err := i.Iterate(func(item shed.Item) (stop bool, err error) {
c++
Expand All @@ -326,6 +348,8 @@ func newItemsCountTest(i shed.Index, want int) func(t *testing.T) {
// value is the same as the number of items in DB.gcIndex.
func newIndexGCSizeTest(db *DB) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

var want uint64
err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
want++
Expand Down Expand Up @@ -354,6 +378,8 @@ type testIndexChunk struct {
// testItemsOrder tests the order of chunks in the index. If sortFunc is not nil,
// chunks will be sorted with it before validation.
func testItemsOrder(t *testing.T, i shed.Index, chunks []testIndexChunk, sortFunc func(i, j int) (less bool)) {
t.Helper()

newItemsCountTest(i, len(chunks))(t)

if sortFunc != nil {
Expand Down
Loading

0 comments on commit 5e982b6

Please sign in to comment.